Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions system/hardware/esim.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env python3

import argparse
import time
from openpilot.system.hardware import HARDWARE


Expand All @@ -13,16 +12,13 @@
parser.add_argument('--nickname', nargs=2, metavar=('iccid', 'name'), help='update the nickname for a profile')
args = parser.parse_args()

mutated = False
lpa = HARDWARE.get_sim_lpa()
if args.switch:
lpa.switch_profile(args.switch)
mutated = True
elif args.delete:
confirm = input('are you sure you want to delete this profile? (y/N) ')
if confirm == 'y':
lpa.delete_profile(args.delete)
mutated = True
else:
print('cancelled')
exit(0)
Expand All @@ -33,11 +29,6 @@
else:
parser.print_help()

if mutated:
HARDWARE.reboot_modem()
# eUICC needs a small delay post-reboot before querying profiles
time.sleep(.5)

profiles = lpa.list_profiles()
print(f'\n{len(profiles)} profile{"s" if len(profiles) > 1 else ""}:')
for p in profiles:
Expand Down
119 changes: 85 additions & 34 deletions system/hardware/tici/lpa.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,28 @@
import math
import os
import serial
import subprocess
import sys
import termios
import time

from collections.abc import Generator

from openpilot.system.hardware.base import LPABase, Profile


DEFAULT_DEVICE = "/dev/ttyUSB2"
DEFAULT_DEVICE = "/dev/modem_at0"
DEFAULT_BAUD = 9600
DEFAULT_TIMEOUT = 5.0
# https://euicc-manual.osmocom.org/docs/lpa/applet-id/
ISDR_AID = "A0000005591010FFFFFFFF8900000100"
MM = "org.freedesktop.ModemManager1"
MM_MODEM = MM + ".Modem"
ES10X_MSS = 120
OPEN_ISDR_RETRIES = 10
OPEN_ISDR_RETRY_DELAY_S = 0.25
OPEN_ISDR_RESET_ATTEMPT = 5
SEND_APDU_RETRIES = 3
DEBUG = os.environ.get("DEBUG") == "1"

# TLV Tags
Expand All @@ -36,28 +43,28 @@ def b64e(data: bytes) -> str:


class AtClient:
def __init__(self, device: str, baud: int, timeout: float, debug: bool) -> None:
self.debug = debug
def __init__(self, device: str, baud: int, timeout: float) -> None:
self.channel: str | None = None
self._device = device
self._baud = baud
self._timeout = timeout
self._serial: serial.Serial | None = None
try:
self._serial = serial.Serial(device, baudrate=baud, timeout=timeout)
self._serial.reset_input_buffer()
except (serial.SerialException, PermissionError, OSError):
pass
self._use_dbus = not os.path.exists(device)

def close(self) -> None:
try:
if self.channel:
self.query(f"AT+CCHC={self.channel}")
try:
self.query(f"AT+CCHC={self.channel}")
except (RuntimeError, TimeoutError):
pass
self.channel = None
finally:
if self._serial:
self._serial.close()

def _send(self, cmd: str) -> None:
if self.debug:
if DEBUG:
print(f"SER >> {cmd}", file=sys.stderr)
self._serial.write((cmd + "\r").encode("ascii"))

Expand All @@ -70,14 +77,26 @@ def _expect(self) -> list[str]:
line = raw.decode(errors="ignore").strip()
if not line:
continue
if self.debug:
if DEBUG:
print(f"SER << {line}", file=sys.stderr)
if line == "OK":
return lines
if line == "ERROR" or line.startswith("+CME ERROR"):
raise RuntimeError(f"AT command failed: {line}")
lines.append(line)

def _ensure_serial(self, reconnect: bool = False) -> None:
if reconnect:
self.channel = None
try:
if self._serial:
self._serial.close()
except Exception:
pass
self._serial = None
if self._serial is None:
self._serial = serial.Serial(self._device, baudrate=self._baud, timeout=self._timeout)

def _get_modem(self):
import dbus
bus = dbus.SystemBus()
Expand All @@ -87,48 +106,81 @@ def _get_modem(self):
return bus.get_object(MM, modem_path)

def _dbus_query(self, cmd: str) -> list[str]:
if self.debug:
if DEBUG:
print(f"DBUS >> {cmd}", file=sys.stderr)
try:
result = str(self._get_modem().Command(cmd, math.ceil(self._timeout), dbus_interface=MM_MODEM, timeout=self._timeout))
except Exception as e:
raise RuntimeError(f"AT command failed: {e}") from e
lines = [line.strip() for line in result.splitlines() if line.strip()]
if self.debug:
if DEBUG:
for line in lines:
print(f"DBUS << {line}", file=sys.stderr)
return lines

def query(self, cmd: str) -> list[str]:
if self._serial:
if self._use_dbus:
return self._dbus_query(cmd)
self._ensure_serial()
try:
self._send(cmd)
return self._expect()
except serial.SerialException:
self._ensure_serial(reconnect=True)
self._send(cmd)
return self._expect()
return self._dbus_query(cmd)

def open_isdr(self) -> None:
# close any stale logical channel from a previous crashed session
try:
self.query("AT+CCHC=1")
except RuntimeError:
pass
def _open_isdr_once(self) -> None:
if self.channel:
try:
self.query(f"AT+CCHC={self.channel}")
except RuntimeError:
pass
self.channel = None
# drain any unsolicited responses before opening
if self._serial and not self._use_dbus:
try:
self._serial.reset_input_buffer()
except (OSError, serial.SerialException, termios.error):
self._ensure_serial(reconnect=True)
for line in self.query(f'AT+CCHO="{ISDR_AID}"'):
if line.startswith("+CCHO:") and (ch := line.split(":", 1)[1].strip()):
self.channel = ch
return
raise RuntimeError("Failed to open ISD-R application")

def open_isdr(self) -> None:
for attempt in range(OPEN_ISDR_RETRIES):
try:
self._open_isdr_once()
return
except (RuntimeError, TimeoutError, termios.error, serial.SerialException):
time.sleep(OPEN_ISDR_RETRY_DELAY_S)
if attempt == OPEN_ISDR_RESET_ATTEMPT:
# reset modem via lte.sh
subprocess.run(['/usr/comma/lte/lte.sh', 'start'], capture_output=True)
self._serial = None # serial port will be re-opened on next attempt
raise RuntimeError("Failed to open ISD-R after retries")

def send_apdu(self, apdu: bytes) -> tuple[bytes, int, int]:
if not self.channel:
raise RuntimeError("Logical channel is not open")
hex_payload = apdu.hex().upper()
for line in self.query(f'AT+CGLA={self.channel},{len(hex_payload)},"{hex_payload}"'):
if line.startswith("+CGLA:"):
parts = line.split(":", 1)[1].split(",", 1)
if len(parts) == 2:
data = bytes.fromhex(parts[1].strip().strip('"'))
if len(data) >= 2:
return data[:-2], data[-2], data[-1]
raise RuntimeError("Missing +CGLA response")
for attempt in range(SEND_APDU_RETRIES):
try:
if not self.channel:
self.open_isdr()
hex_payload = apdu.hex().upper()
for line in self.query(f'AT+CGLA={self.channel},{len(hex_payload)},"{hex_payload}"'):
if line.startswith("+CGLA:"):
parts = line.split(":", 1)[1].split(",", 1)
if len(parts) == 2:
data = bytes.fromhex(parts[1].strip().strip('"'))
if len(data) >= 2:
return data[:-2], data[-2], data[-1]
raise RuntimeError("Missing +CGLA response")
except (RuntimeError, ValueError):
self.channel = None
if attempt == SEND_APDU_RETRIES - 1:
raise
raise RuntimeError("send_apdu failed")


# --- TLV utilities ---
Expand Down Expand Up @@ -250,8 +302,7 @@ def __new__(cls):
def __init__(self):
if hasattr(self, '_client'):
return
self._client = AtClient(DEFAULT_DEVICE, DEFAULT_BAUD, DEFAULT_TIMEOUT, debug=DEBUG)
self._client.open_isdr()
self._client = AtClient(DEFAULT_DEVICE, DEFAULT_BAUD, DEFAULT_TIMEOUT)
atexit.register(self._client.close)

def list_profiles(self) -> list[Profile]:
Expand Down
Loading