diff --git a/system/hardware/esim.py b/system/hardware/esim.py index 9b7d4f9ec0b086..40600d26b565f5 100755 --- a/system/hardware/esim.py +++ b/system/hardware/esim.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 import argparse -import time from openpilot.system.hardware import HARDWARE @@ -13,16 +12,13 @@ parser.add_argument('--nickname', nargs=2, metavar=('iccid', 'name'), help='update the nickname for a profile') args = parser.parse_args() - mutated = False lpa = HARDWARE.get_sim_lpa() if args.switch: lpa.switch_profile(args.switch) - mutated = True elif args.delete: confirm = input('are you sure you want to delete this profile? (y/N) ') if confirm == 'y': lpa.delete_profile(args.delete) - mutated = True else: print('cancelled') exit(0) @@ -33,11 +29,6 @@ else: parser.print_help() - if mutated: - HARDWARE.reboot_modem() - # eUICC needs a small delay post-reboot before querying profiles - time.sleep(.5) - profiles = lpa.list_profiles() print(f'\n{len(profiles)} profile{"s" if len(profiles) > 1 else ""}:') for p in profiles: diff --git a/system/hardware/tici/lpa.py b/system/hardware/tici/lpa.py index 2e7e6a0ba97366..93cbca2479a545 100644 --- a/system/hardware/tici/lpa.py +++ b/system/hardware/tici/lpa.py @@ -5,14 +5,17 @@ import math import os import serial +import subprocess import sys +import termios +import time from collections.abc import Generator from openpilot.system.hardware.base import LPABase, Profile -DEFAULT_DEVICE = "/dev/ttyUSB2" +DEFAULT_DEVICE = "/dev/modem_at0" DEFAULT_BAUD = 9600 DEFAULT_TIMEOUT = 5.0 # https://euicc-manual.osmocom.org/docs/lpa/applet-id/ @@ -20,6 +23,10 @@ MM = "org.freedesktop.ModemManager1" MM_MODEM = MM + ".Modem" ES10X_MSS = 120 +OPEN_ISDR_RETRIES = 10 +OPEN_ISDR_RETRY_DELAY_S = 0.25 +OPEN_ISDR_RESET_ATTEMPT = 5 +SEND_APDU_RETRIES = 3 DEBUG = os.environ.get("DEBUG") == "1" # TLV Tags @@ -36,28 +43,28 @@ def b64e(data: bytes) -> str: class AtClient: - def __init__(self, device: str, baud: int, timeout: float, debug: bool) -> None: - self.debug = debug + def __init__(self, device: str, baud: int, timeout: float) -> None: self.channel: str | None = None + self._device = device + self._baud = baud self._timeout = timeout self._serial: serial.Serial | None = None - try: - self._serial = serial.Serial(device, baudrate=baud, timeout=timeout) - self._serial.reset_input_buffer() - except (serial.SerialException, PermissionError, OSError): - pass + self._use_dbus = not os.path.exists(device) def close(self) -> None: try: if self.channel: - self.query(f"AT+CCHC={self.channel}") + try: + self.query(f"AT+CCHC={self.channel}") + except (RuntimeError, TimeoutError): + pass self.channel = None finally: if self._serial: self._serial.close() def _send(self, cmd: str) -> None: - if self.debug: + if DEBUG: print(f"SER >> {cmd}", file=sys.stderr) self._serial.write((cmd + "\r").encode("ascii")) @@ -70,7 +77,7 @@ def _expect(self) -> list[str]: line = raw.decode(errors="ignore").strip() if not line: continue - if self.debug: + if DEBUG: print(f"SER << {line}", file=sys.stderr) if line == "OK": return lines @@ -78,6 +85,18 @@ def _expect(self) -> list[str]: raise RuntimeError(f"AT command failed: {line}") lines.append(line) + def _ensure_serial(self, reconnect: bool = False) -> None: + if reconnect: + self.channel = None + try: + if self._serial: + self._serial.close() + except Exception: + pass + self._serial = None + if self._serial is None: + self._serial = serial.Serial(self._device, baudrate=self._baud, timeout=self._timeout) + def _get_modem(self): import dbus bus = dbus.SystemBus() @@ -87,48 +106,81 @@ def _get_modem(self): return bus.get_object(MM, modem_path) def _dbus_query(self, cmd: str) -> list[str]: - if self.debug: + if DEBUG: print(f"DBUS >> {cmd}", file=sys.stderr) try: result = str(self._get_modem().Command(cmd, math.ceil(self._timeout), dbus_interface=MM_MODEM, timeout=self._timeout)) except Exception as e: raise RuntimeError(f"AT command failed: {e}") from e lines = [line.strip() for line in result.splitlines() if line.strip()] - if self.debug: + if DEBUG: for line in lines: print(f"DBUS << {line}", file=sys.stderr) return lines def query(self, cmd: str) -> list[str]: - if self._serial: + if self._use_dbus: + return self._dbus_query(cmd) + self._ensure_serial() + try: + self._send(cmd) + return self._expect() + except serial.SerialException: + self._ensure_serial(reconnect=True) self._send(cmd) return self._expect() - return self._dbus_query(cmd) - def open_isdr(self) -> None: - # close any stale logical channel from a previous crashed session - try: - self.query("AT+CCHC=1") - except RuntimeError: - pass + def _open_isdr_once(self) -> None: + if self.channel: + try: + self.query(f"AT+CCHC={self.channel}") + except RuntimeError: + pass + self.channel = None + # drain any unsolicited responses before opening + if self._serial and not self._use_dbus: + try: + self._serial.reset_input_buffer() + except (OSError, serial.SerialException, termios.error): + self._ensure_serial(reconnect=True) for line in self.query(f'AT+CCHO="{ISDR_AID}"'): if line.startswith("+CCHO:") and (ch := line.split(":", 1)[1].strip()): self.channel = ch return raise RuntimeError("Failed to open ISD-R application") + def open_isdr(self) -> None: + for attempt in range(OPEN_ISDR_RETRIES): + try: + self._open_isdr_once() + return + except (RuntimeError, TimeoutError, termios.error, serial.SerialException): + time.sleep(OPEN_ISDR_RETRY_DELAY_S) + if attempt == OPEN_ISDR_RESET_ATTEMPT: + # reset modem via lte.sh + subprocess.run(['/usr/comma/lte/lte.sh', 'start'], capture_output=True) + self._serial = None # serial port will be re-opened on next attempt + raise RuntimeError("Failed to open ISD-R after retries") + def send_apdu(self, apdu: bytes) -> tuple[bytes, int, int]: - if not self.channel: - raise RuntimeError("Logical channel is not open") - hex_payload = apdu.hex().upper() - for line in self.query(f'AT+CGLA={self.channel},{len(hex_payload)},"{hex_payload}"'): - if line.startswith("+CGLA:"): - parts = line.split(":", 1)[1].split(",", 1) - if len(parts) == 2: - data = bytes.fromhex(parts[1].strip().strip('"')) - if len(data) >= 2: - return data[:-2], data[-2], data[-1] - raise RuntimeError("Missing +CGLA response") + for attempt in range(SEND_APDU_RETRIES): + try: + if not self.channel: + self.open_isdr() + hex_payload = apdu.hex().upper() + for line in self.query(f'AT+CGLA={self.channel},{len(hex_payload)},"{hex_payload}"'): + if line.startswith("+CGLA:"): + parts = line.split(":", 1)[1].split(",", 1) + if len(parts) == 2: + data = bytes.fromhex(parts[1].strip().strip('"')) + if len(data) >= 2: + return data[:-2], data[-2], data[-1] + raise RuntimeError("Missing +CGLA response") + except (RuntimeError, ValueError): + self.channel = None + if attempt == SEND_APDU_RETRIES - 1: + raise + raise RuntimeError("send_apdu failed") # --- TLV utilities --- @@ -250,8 +302,7 @@ def __new__(cls): def __init__(self): if hasattr(self, '_client'): return - self._client = AtClient(DEFAULT_DEVICE, DEFAULT_BAUD, DEFAULT_TIMEOUT, debug=DEBUG) - self._client.open_isdr() + self._client = AtClient(DEFAULT_DEVICE, DEFAULT_BAUD, DEFAULT_TIMEOUT) atexit.register(self._client.close) def list_profiles(self) -> list[Profile]: