diff --git a/pyproject.toml b/pyproject.toml index a1123234001436..486c6cfb569901 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -74,6 +74,7 @@ dependencies = [ # ui "raylib > 5.5.0.3", + "zxing-cpp==3.0.0", "qrcode", "jeepney", "pillow", @@ -147,7 +148,7 @@ testpaths = [ [tool.codespell] quiet-level = 3 # if you've got a short variable name that's getting flagged, add it here -ignore-words-list = "bu,ro,te,ue,alo,hda,ois,nam,nams,ned,som,parm,setts,inout,warmup,bumb,nd,sie,preints,whit,indexIn,ws,uint,grey,deque,stdio,amin,BA,LITE,atEnd,UIs,errorString,arange,FocusIn,od,tim,relA,hist,copyable,jupyter,thead,TGE,abl,lite,ser" +ignore-words-list = "bu,ro,te,ue,alo,hda,ois,nam,nams,ned,som,parm,setts,inout,warmup,bumb,nd,sie,preints,whit,indexIn,ws,uint,grey,deque,stdio,amin,BA,LITE,atEnd,UIs,errorString,arange,FocusIn,od,tim,relA,hist,copyable,jupyter,thead,TGE,abl,lite,ot,ser" builtin = "clear,rare,informal,code,names,en-GB_to_en-US" skip = "./third_party/*, ./tinygrad/*, ./tinygrad_repo/*, ./msgq/*, ./panda/*, ./opendbc/*, ./opendbc_repo/*, ./rednose/*, ./rednose_repo/*, ./teleoprtc/*, ./teleoprtc_repo/*, *.po, uv.lock, *.onnx, ./cereal/gen/*, */c_generated_code/*, docs/assets/*, tools/plotjuggler/layouts/*, selfdrive/assets/offroad/mici_fcc.html" diff --git a/selfdrive/pandad/pandad.cc b/selfdrive/pandad/pandad.cc index 28d459f458aed3..257e1a4600f42f 100644 --- a/selfdrive/pandad/pandad.cc +++ b/selfdrive/pandad/pandad.cc @@ -326,7 +326,9 @@ void process_peripheral_state(Panda *panda, PubMaster *pm, bool no_fan_control) cur_integ_lines = (driver_view ? integ_lines_filter_driver_view : integ_lines_filter).update(cur_integ_lines); last_driver_camera_t = event.getLogMonoTime(); - if (cur_integ_lines <= CUTOFF_IL) { + if (driver_view) { + ir_pwr = 0; + } else if (cur_integ_lines <= CUTOFF_IL) { ir_pwr = 0; } else if (cur_integ_lines > SATURATE_IL) { ir_pwr = 100; diff --git a/selfdrive/ui/mici/layouts/settings/network/__init__.py b/selfdrive/ui/mici/layouts/settings/network/__init__.py index ddbab4b478cb24..defc486fc39265 100644 --- a/selfdrive/ui/mici/layouts/settings/network/__init__.py +++ b/selfdrive/ui/mici/layouts/settings/network/__init__.py @@ -1,10 +1,58 @@ import pyray as rl +from cereal import log +from openpilot.system.ui.lib.cellular_manager import CellularManager from openpilot.selfdrive.ui.mici.layouts.settings.network.wifi_ui import WifiIcon from openpilot.selfdrive.ui.mici.widgets.button import BigButton +from openpilot.selfdrive.ui.ui_state import ui_state from openpilot.system.ui.lib.application import gui_app from openpilot.system.ui.lib.wifi_manager import WifiManager, ConnectStatus, SecurityType, normalize_ssid +NetworkType = log.DeviceState.NetworkType + + +class ESimNetworkButton(BigButton): + def __init__(self, cellular_manager: CellularManager): + self._cellular_manager = cellular_manager + self._cell_none_icon = gui_app.texture("icons_mici/settings/network/cell_strength_none.png", 64, 47) + self._cell_low_icon = gui_app.texture("icons_mici/settings/network/cell_strength_low.png", 64, 47) + self._cell_medium_icon = gui_app.texture("icons_mici/settings/network/cell_strength_medium.png", 64, 47) + self._cell_high_icon = gui_app.texture("icons_mici/settings/network/cell_strength_high.png", 64, 47) + self._cell_full_icon = gui_app.texture("icons_mici/settings/network/cell_strength_full.png", 64, 47) + super().__init__("esim", "no active profile", self._cell_none_icon) + + def _update_state(self): + super()._update_state() + + if self._cellular_manager.busy: + self.set_text("esim") + self.set_value("switching...") + self.set_icon(self._cell_none_icon) + else: + active = next((p for p in self._cellular_manager.profiles if p.enabled), None) + if active: + name = active.nickname or active.provider or active.iccid[:12] + self.set_text(f"{name} (...{active.iccid[-4:]})") + self.set_value(self._cellular_manager.modem_ip or "obtaining IP...") + self.set_icon(self._get_cell_icon()) + else: + self.set_text("esim") + self.set_value("no active profile") + self.set_icon(self._cell_none_icon) + + def _get_cell_icon(self): + device_state = ui_state.sm['deviceState'] + net_type = device_state.networkType + if net_type not in (NetworkType.cell2G, NetworkType.cell3G, NetworkType.cell4G, NetworkType.cell5G): + return self._cell_none_icon + strength = device_state.networkStrength + level = max(0, min(5, strength.raw + 1)) if strength.raw > 0 else 0 + return {0: self._cell_none_icon, + 2: self._cell_low_icon, + 3: self._cell_medium_icon, + 4: self._cell_high_icon, + 5: self._cell_full_icon}.get(level, self._cell_none_icon) + class WifiNetworkButton(BigButton): def __init__(self, wifi_manager: WifiManager): diff --git a/selfdrive/ui/mici/layouts/settings/network/esim_ui.py b/selfdrive/ui/mici/layouts/settings/network/esim_ui.py new file mode 100644 index 00000000000000..6b9b9c7924a5c4 --- /dev/null +++ b/selfdrive/ui/mici/layouts/settings/network/esim_ui.py @@ -0,0 +1,370 @@ +import threading +import urllib.request + +import zxingcpp +import pyray as rl +from collections.abc import Callable +from msgq.visionipc import VisionStreamType + +from openpilot.system.ui.lib.cellular_manager import CellularManager +from openpilot.selfdrive.ui.mici.onroad.cameraview import CameraView +from openpilot.selfdrive.ui.mici.widgets.button import BigButton, LABEL_COLOR +from openpilot.selfdrive.ui.mici.widgets.dialog import BigConfirmationDialog, BigDialog, BigInputDialog +from openpilot.selfdrive.ui.ui_state import ui_state +from openpilot.system.hardware.base import Profile +from openpilot.system.ui.lib.application import gui_app, FontWeight, MousePos +from openpilot.system.ui.lib.multilang import tr +from openpilot.system.ui.widgets import Widget +from openpilot.system.ui.widgets.label import gui_label +from openpilot.system.ui.widgets.nav_widget import NavWidget +from openpilot.system.ui.widgets.scroller import NavScroller + + +class DeleteButton(Widget): + MARGIN = 12 + + def __init__(self, delete_callback: Callable): + super().__init__() + self._delete_callback = delete_callback + + self._bg_txt = gui_app.texture("icons_mici/settings/network/new/forget_button.png", 84, 84) + self._bg_pressed_txt = gui_app.texture("icons_mici/settings/network/new/forget_button_pressed.png", 84, 84) + self._trash_txt = gui_app.texture("icons_mici/settings/network/new/trash.png", 29, 35) + self.set_rect(rl.Rectangle(0, 0, 84 + self.MARGIN * 2, 84 + self.MARGIN * 2)) + + def _handle_mouse_release(self, mouse_pos: MousePos): + super()._handle_mouse_release(mouse_pos) + dlg = BigConfirmationDialog("slide to delete", gui_app.texture("icons_mici/settings/network/new/trash.png", 54, 64), + self._delete_callback, red=True) + gui_app.push_widget(dlg) + + def _render(self, _): + bg_txt = self._bg_pressed_txt if self.is_pressed else self._bg_txt + rl.draw_texture_ex(bg_txt, (self._rect.x + (self._rect.width - self._bg_txt.width) / 2, + self._rect.y + (self._rect.height - self._bg_txt.height) / 2), 0, 1.0, rl.WHITE) + + trash_x = self._rect.x + (self._rect.width - self._trash_txt.width) / 2 + trash_y = self._rect.y + (self._rect.height - self._trash_txt.height) / 2 + rl.draw_texture_ex(self._trash_txt, (trash_x, trash_y), 0, 1.0, rl.WHITE) + + +def _profile_display_name(profile: Profile) -> str: + name = profile.nickname or profile.provider or profile.iccid[:12] + suffix = profile.iccid[-4:] + return f"{name} (...{suffix})" + + +def _is_valid_lpa_code(text: str) -> bool: + if not text.startswith("LPA:"): + return False + parts = text[4:].split("$") + return len(parts) == 3 and all(parts) + + +class QRScannerDialog(NavWidget): + def __init__(self, on_qr_detected: Callable[[str], None]): + super().__init__() + self._on_qr_detected = on_qr_detected + self._camera_view = CameraView("camerad", VisionStreamType.VISION_STREAM_DRIVER) + self._detected = False + self.set_rect(rl.Rectangle(0, 0, gui_app.width, gui_app.height)) + + def show_event(self): + super().show_event() + ui_state.params.put_bool("IsDriverViewEnabled", True) + + def hide_event(self): + super().hide_event() + ui_state.params.put_bool("IsDriverViewEnabled", False) + + def __del__(self): + self._camera_view.close() + + def _update_state(self): + super()._update_state() + self._camera_view._update_state() + + if self._detected or not self._camera_view.frame: + return + + frame = self._camera_view.frame + gray = frame.data[:frame.height * frame.stride].reshape(frame.height, frame.stride)[:, :frame.width] + results = zxingcpp.read_barcodes(gray) + if results: + data = results[0].text + if _is_valid_lpa_code(data): + self._detected = True + self.dismiss(lambda: self._on_qr_detected(data)) + + def _render(self, rect): + rl.begin_scissor_mode(int(rect.x), int(rect.y), int(rect.width), int(rect.height)) + self._camera_view._render(rect) + + if not self._camera_view.frame: + gui_label(rect, tr("camera starting"), font_size=54, font_weight=FontWeight.BOLD, + alignment=rl.GuiTextAlignment.TEXT_ALIGN_CENTER) + else: + label_y = rect.y + rect.height * 3 / 4 + label_rect = rl.Rectangle(rect.x, label_y + (rect.height - label_y) / 2 - 20, rect.width, 40) + gui_label(label_rect, "hold QR code to camera", font_size=32, font_weight=FontWeight.MEDIUM, + alignment=rl.GuiTextAlignment.TEXT_ALIGN_CENTER, + color=rl.Color(255, 255, 255, int(255 * 0.9))) + + rl.end_scissor_mode() + + +class InstallingProfileDialog(BigDialog): + DOT_STEP = 0.6 + + def __init__(self): + super().__init__("installing profile", "please wait...") + self._show_time = 0.0 + + def show_event(self): + super().show_event() + self._nav_bar._alpha = 0.0 + self._show_time = rl.get_time() + + def _back_enabled(self) -> bool: + return False + + def _render(self, _): + t = (rl.get_time() - self._show_time) % (self.DOT_STEP * 2) + dots = "." * min(int(t / (self.DOT_STEP / 4)), 3) + self._card.set_value(f"please wait{dots}") + super()._render(_) + + +class ESimProfileButton(BigButton): + LABEL_PADDING = 98 + LABEL_WIDTH = 402 - 98 - 28 + SUB_LABEL_WIDTH = 402 - BigButton.LABEL_HORIZONTAL_PADDING * 2 + + def __init__(self, profile: Profile, cellular_manager: CellularManager): + self._cellular_manager = cellular_manager + is_comma = cellular_manager.is_comma_profile(profile.iccid) + display_name = "comma prime" if is_comma else _profile_display_name(profile) + super().__init__(display_name, scroll=True) + + self._profile = profile + self._deleting = False + + self._cell_full_txt = gui_app.texture("icons_mici/settings/network/cell_strength_full.png", 48, 36) + self._cell_none_txt = gui_app.texture("icons_mici/settings/network/cell_strength_none.png", 48, 36) + self._check_txt = gui_app.texture("icons_mici/setup/driver_monitoring/dm_check.png", 32, 32) + self._comma_txt = gui_app.texture("icons_mici/settings/comma_icon.png", 36, 36) if is_comma else None + + self._delete_btn = DeleteButton(self._on_delete) + + @property + def profile(self) -> Profile: + return self._profile + + def update_profile(self, profile: Profile): + self._profile = profile + self._deleting = False + is_comma = self._cellular_manager.is_comma_profile(profile.iccid) + self.set_text("comma prime" if is_comma else _profile_display_name(profile)) + + @property + def _show_delete_btn(self) -> bool: + if self._deleting or self._profile.enabled or self._cellular_manager.busy: + return False + return not self._cellular_manager.is_comma_profile(self._profile.iccid) + + def _on_delete(self): + if self._deleting: + return + self._deleting = True + self._cellular_manager.delete_profile(self._profile.iccid) + + def _handle_mouse_release(self, mouse_pos: MousePos): + if self._show_delete_btn and rl.check_collision_point_rec(mouse_pos, self._delete_btn.rect): + return + super()._handle_mouse_release(mouse_pos) + + def _get_label_font_size(self): + return 48 + + def _draw_content(self, btn_y: float): + self._label.set_color(LABEL_COLOR) + label_rect = rl.Rectangle(self._rect.x + self.LABEL_PADDING, btn_y + self.LABEL_VERTICAL_PADDING, + self.LABEL_WIDTH, self._rect.height - self.LABEL_VERTICAL_PADDING * 2) + self._label.render(label_rect) + + if self.value: + sub_label_x = self._rect.x + self.LABEL_HORIZONTAL_PADDING + label_y = btn_y + self._rect.height - self.LABEL_VERTICAL_PADDING + sub_label_w = self.SUB_LABEL_WIDTH - (self._delete_btn.rect.width if self._show_delete_btn else 0) + sub_label_height = self._sub_label.get_content_height(sub_label_w) + + if self._profile.enabled and not self._deleting: + check_y = int(label_y - sub_label_height + (sub_label_height - self._check_txt.height) / 2) + rl.draw_texture_ex(self._check_txt, rl.Vector2(sub_label_x, check_y), 0.0, 1.0, rl.Color(255, 255, 255, int(255 * 0.9 * 0.65))) + sub_label_x += self._check_txt.width + 14 + + sub_label_rect = rl.Rectangle(sub_label_x, label_y - sub_label_height, sub_label_w, sub_label_height) + self._sub_label.render(sub_label_rect) + + # Cell icon (comma icon for comma profiles) + if self._comma_txt: + rl.draw_texture_ex(self._comma_txt, (self._rect.x + 36, btn_y + 30, ), 0.0, 1.0, rl.WHITE) + else: + cell_icon = self._cell_full_txt if self._profile.enabled else self._cell_none_txt + rl.draw_texture_ex(cell_icon, (self._rect.x + 30, btn_y + 30, ), 0.0, 1.0, rl.WHITE) + + # Delete button + if self._show_delete_btn: + self._delete_btn.render(rl.Rectangle( + self._rect.x + self._rect.width - self._delete_btn.rect.width, + btn_y + self._rect.height - self._delete_btn.rect.height, + self._delete_btn.rect.width, + self._delete_btn.rect.height, + )) + + def set_touch_valid_callback(self, touch_callback: Callable[[], bool]) -> None: + super().set_touch_valid_callback(lambda: touch_callback() and not self._delete_btn.is_pressed) + self._delete_btn.set_touch_valid_callback(touch_callback) + + def _update_state(self): + super()._update_state() + + if self._cellular_manager.busy or self._deleting: + self.set_enabled(False) + self._sub_label.set_color(rl.Color(255, 255, 255, int(255 * 0.585))) + self._sub_label.set_font_weight(FontWeight.ROMAN) + + if self._deleting: + self.set_value("deleting...") + elif self._cellular_manager.busy: + self.set_value("switching..." if not self._profile.enabled else "active") + elif self._profile.enabled: + self.set_value("active") + self.set_enabled(True) + self._sub_label.set_color(rl.Color(255, 255, 255, int(255 * 0.585))) + self._sub_label.set_font_weight(FontWeight.ROMAN) + else: + self.set_value("switch") + self.set_enabled(True) + self._sub_label.set_color(rl.Color(255, 255, 255, int(255 * 0.9))) + self._sub_label.set_font_weight(FontWeight.SEMI_BOLD) + + +class ESimUIMici(NavScroller): + def __init__(self, cellular_manager: CellularManager): + super().__init__() + + self._cellular_manager = cellular_manager + self._add_profile_btn = BigButton("add profile", "scan QR code") + self._add_profile_btn.set_click_callback(self._on_add_profile) + self._installing_dialog: InstallingProfileDialog | None = None + + self._cellular_manager.add_callbacks( + profiles_updated=self._on_profiles_updated, + operation_error=self._on_error, + ) + + def show_event(self): + super().show_event() + self._on_profiles_updated(self._cellular_manager.profiles) + self._cellular_manager.refresh_profiles() + + def _on_profiles_updated(self, profiles: list[Profile]): + if self._installing_dialog: + self._installing_dialog.dismiss() + self._installing_dialog = None + + existing = {btn.profile.iccid: btn for btn in self._scroller.items if isinstance(btn, ESimProfileButton)} + + current_iccids = {p.iccid for p in profiles} + + # Update existing and add new + for profile in profiles: + if profile.iccid in existing: + existing[profile.iccid].update_profile(profile) + else: + btn = ESimProfileButton(profile, self._cellular_manager) + btn.set_click_callback(lambda iccid=profile.iccid: self._on_profile_clicked(iccid)) + self._scroller.add_widget(btn) + + # Remove deleted profiles + self._scroller.items[:] = [ + btn for btn in self._scroller.items + if not isinstance(btn, ESimProfileButton) or btn.profile.iccid in current_iccids + ] + + # Keep add button at the end + if self._add_profile_btn in self._scroller.items: + self._scroller.items.append(self._scroller.items.pop(self._scroller.items.index(self._add_profile_btn))) + else: + self._scroller.add_widget(self._add_profile_btn) + + def _move_profile_to_front(self, iccid: str | None, scroll: bool = False): + if iccid is None: + return + + front_btn_idx = next((i for i, btn in enumerate(self._scroller.items) + if isinstance(btn, ESimProfileButton) and + btn.profile.iccid == iccid), None) + + if front_btn_idx is not None and front_btn_idx > 0: + self._scroller.move_item(front_btn_idx, 0) + + if scroll: + self._scroller.scroll_to(self._scroller.scroll_panel.get_offset(), smooth=True) + + def _update_state(self): + super()._update_state() + self._add_profile_btn.set_enabled(not self._cellular_manager.busy) + + # Keep the switching/active profile at the front with animation + iccid = self._cellular_manager.switching_iccid + if iccid is None: + active = next((p for p in self._cellular_manager.profiles if p.enabled), None) + iccid = active.iccid if active else None + self._move_profile_to_front(iccid) + + def _on_add_profile(self): + scanner = QRScannerDialog(on_qr_detected=self._on_qr_scanned) + gui_app.push_widget(scanner) + + def _on_qr_scanned(self, lpa_code: str): + self._pending_lpa_code = lpa_code + dlg = BigInputDialog("enter a nickname...", minimum_length=0, + confirm_callback=self._on_nickname_for_new_profile) + gui_app.push_widget(dlg) + + def _on_nickname_for_new_profile(self, nickname: str): + self._pending_nickname = nickname.strip() or None + self._installing_dialog = InstallingProfileDialog() + gui_app.push_widget(self._installing_dialog) + + def check_connectivity(): + try: + req = urllib.request.Request("https://openpilot.comma.ai", method="HEAD") + urllib.request.urlopen(req, timeout=2.0) + connected = True + except Exception: + connected = False + self._cellular_manager._callback_queue.append( + lambda: self._cellular_manager.download_profile(self._pending_lpa_code, self._pending_nickname) if connected + else self._on_error("no internet connection\nconnect to wifi or\ncellular to install") + ) + + threading.Thread(target=check_connectivity, daemon=True).start() + + def _on_error(self, error: str): + if self._installing_dialog: + self._installing_dialog.dismiss(lambda: gui_app.push_widget(BigDialog("esim error", error))) + self._installing_dialog = None + else: + dlg = BigDialog("esim error", error) + gui_app.push_widget(dlg) + + def _on_profile_clicked(self, iccid: str): + profile = next((p for p in self._cellular_manager.profiles if p.iccid == iccid), None) + if profile is None or profile.enabled: + return + + self._cellular_manager.switch_profile(iccid) + self._move_profile_to_front(iccid, scroll=True) diff --git a/selfdrive/ui/mici/layouts/settings/network/network_layout.py b/selfdrive/ui/mici/layouts/settings/network/network_layout.py index 9f6fae4b5f8958..757d69c6412758 100644 --- a/selfdrive/ui/mici/layouts/settings/network/network_layout.py +++ b/selfdrive/ui/mici/layouts/settings/network/network_layout.py @@ -1,5 +1,7 @@ from openpilot.system.ui.widgets.scroller import NavScroller -from openpilot.selfdrive.ui.mici.layouts.settings.network import WifiNetworkButton +from openpilot.selfdrive.ui.mici.layouts.settings.network import ESimNetworkButton, WifiNetworkButton +from openpilot.system.ui.lib.cellular_manager import CellularManager +from openpilot.selfdrive.ui.mici.layouts.settings.network.esim_ui import ESimUIMici from openpilot.selfdrive.ui.mici.layouts.settings.network.wifi_ui import WifiUIMici from openpilot.selfdrive.ui.mici.widgets.button import BigButton, BigMultiToggle, BigParamControl, BigToggle from openpilot.selfdrive.ui.mici.widgets.dialog import BigInputDialog @@ -64,6 +66,12 @@ def network_metered_callback(value: str): self._wifi_button = WifiNetworkButton(self._wifi_manager) self._wifi_button.set_click_callback(lambda: gui_app.push_widget(self._wifi_ui)) + # ******** eSIM ******** + self._cellular_manager = CellularManager() + self._esim_ui = ESimUIMici(self._cellular_manager) + self._esim_button = ESimNetworkButton(self._cellular_manager) + self._esim_button.set_click_callback(lambda: gui_app.push_widget(self._esim_ui)) + # ******** Advanced settings ******** # ******** Roaming toggle ******** self._roaming_btn = BigParamControl("enable roaming", "GsmRoaming", toggle_callback=self._toggle_roaming) @@ -78,6 +86,7 @@ def network_metered_callback(value: str): # Main scroller ---------------------------------- self._scroller.add_widgets([ self._wifi_button, + self._esim_button, self._network_metered_btn, self._tethering_toggle_btn, self._tethering_password_btn, @@ -99,22 +108,25 @@ def _update_state(self): # If not using prime SIM, show GSM settings and enable IPv4 forwarding show_cell_settings = ui_state.prime_state.get_type() in (PrimeType.NONE, PrimeType.LITE) self._wifi_manager.set_ipv4_forward(show_cell_settings) - self._roaming_btn.set_visible(show_cell_settings) + self._roaming_btn.set_visible(True) self._apn_btn.set_visible(show_cell_settings) self._cellular_metered_btn.set_visible(show_cell_settings) def show_event(self): super().show_event() self._wifi_manager.set_active(True) + self._cellular_manager.refresh_profiles() - # Process wifi callbacks while at any point in the nav stack + # Process wifi and esim callbacks while at any point in the nav stack gui_app.add_nav_stack_tick(self._wifi_manager.process_callbacks) + gui_app.add_nav_stack_tick(self._cellular_manager.process_callbacks) def hide_event(self): super().hide_event() self._wifi_manager.set_active(False) gui_app.remove_nav_stack_tick(self._wifi_manager.process_callbacks) + gui_app.remove_nav_stack_tick(self._cellular_manager.process_callbacks) def _toggle_roaming(self, checked: bool): self._wifi_manager.update_gsm_settings(checked, ui_state.params.get("GsmApn") or "", ui_state.params.get_bool("GsmMetered")) diff --git a/system/hardware/base.py b/system/hardware/base.py index 1a19f908c62971..caafbb9cf07906 100644 --- a/system/hardware/base.py +++ b/system/hardware/base.py @@ -91,8 +91,11 @@ def nickname_profile(self, iccid: str, nickname: str) -> None: def switch_profile(self, iccid: str) -> None: pass + def process_notifications(self) -> None: + pass + def is_comma_profile(self, iccid: str) -> bool: - return any(iccid.startswith(prefix) for prefix in ('8985235',)) + return any(p.iccid == iccid and p.provider == 'Webbing' for p in self.list_profiles()) class HardwareBase(ABC): @staticmethod diff --git a/system/hardware/esim.py b/system/hardware/esim.py index 9b7d4f9ec0b086..c9a92fa6362073 100755 --- a/system/hardware/esim.py +++ b/system/hardware/esim.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 import argparse -import time from openpilot.system.hardware import HARDWARE @@ -33,11 +32,6 @@ else: parser.print_help() - if mutated: - HARDWARE.reboot_modem() - # eUICC needs a small delay post-reboot before querying profiles - time.sleep(.5) - profiles = lpa.list_profiles() print(f'\n{len(profiles)} profile{"s" if len(profiles) > 1 else ""}:') for p in profiles: diff --git a/system/hardware/tici/gsma_ci_bundle.pem b/system/hardware/tici/gsma_ci_bundle.pem new file mode 100644 index 00000000000000..3ee7fd12521b12 --- /dev/null +++ b/system/hardware/tici/gsma_ci_bundle.pem @@ -0,0 +1,133 @@ +# GSMA Certificate Issuer (CI) bundle for eSIM RSP +# Source: https://euicc-manual.osmocom.org/docs/pki/ci/bundle.pem + +issuer= + countryName = CH + organizationName = OISTE Foundation + commonName = OISTE GSMA CI G1 +notBefore=2024-01-16 23:17:39Z +notAfter=2059-01-07 23:17:38Z +-----BEGIN CERTIFICATE----- +MIIB9zCCAZ2gAwIBAgIUSpBSCCDYPOEG/IFHUCKpZ2pIAQMwCgYIKoZIzj0EAwIw +QzELMAkGA1UEBhMCQ0gxGTAXBgNVBAoMEE9JU1RFIEZvdW5kYXRpb24xGTAXBgNV +BAMMEE9JU1RFIEdTTUEgQ0kgRzEwIBcNMjQwMTE2MjMxNzM5WhgPMjA1OTAxMDcy +MzE3MzhaMEMxCzAJBgNVBAYTAkNIMRkwFwYDVQQKDBBPSVNURSBGb3VuZGF0aW9u +MRkwFwYDVQQDDBBPSVNURSBHU01BIENJIEcxMFkwEwYHKoZIzj0CAQYIKoZIzj0D +AQcDQgAEvZ3s3PFC4NgrCcCMmHJ6DJ66uzAHuLcvjJnOn+TtBNThS7YHLDyHCa2v +7D+zTP+XTtgqgcLoB56Gha9EQQQ4xKNtMGswDwYDVR0TAQH/BAUwAwEB/zAQBgNV +HREECTAHiAVghXQFDjAXBgNVHSABAf8EDTALMAkGB2eBEgECAQAwHQYDVR0OBBYE +FEwnlnrSDBSzkelgHkHmBK1XwCIvMA4GA1UdDwEB/wQEAwIBBjAKBggqhkjOPQQD +AgNIADBFAiBVcywTj017jKpAQ+gwy4MqK2hQvzve6lkvQkgSP6ykHwIhAI0KFwCD +jnPbmcJsG41hUrWNlf+IcrMvFuYii0DasBNi +-----END CERTIFICATE----- +issuer= + organizationName = GSM Association + commonName = GSM Association - RSP2 Root CI1 +notBefore=2017-02-22 00:00:00Z +notAfter=2052-02-21 23:59:59Z +-----BEGIN CERTIFICATE----- +MIICSTCCAe+gAwIBAgIQbmhWeneg7nyF7hg5Y9+qejAKBggqhkjOPQQDAjBEMRgw +FgYDVQQKEw9HU00gQXNzb2NpYXRpb24xKDAmBgNVBAMTH0dTTSBBc3NvY2lhdGlv +biAtIFJTUDIgUm9vdCBDSTEwIBcNMTcwMjIyMDAwMDAwWhgPMjA1MjAyMjEyMzU5 +NTlaMEQxGDAWBgNVBAoTD0dTTSBBc3NvY2lhdGlvbjEoMCYGA1UEAxMfR1NNIEFz +c29jaWF0aW9uIC0gUlNQMiBSb290IENJMTBZMBMGByqGSM49AgEGCCqGSM49AwEH +A0IABJ1qutL0HCMX52GJ6/jeibsAqZfULWj/X10p/Min6seZN+hf5llovbCNuB2n +unLz+O8UD0SUCBUVo8e6n9X1TuajgcAwgb0wDgYDVR0PAQH/BAQDAgEGMA8GA1Ud +EwEB/wQFMAMBAf8wEwYDVR0RBAwwCogIKwYBBAGC6WAwFwYDVR0gAQH/BA0wCzAJ +BgdngRIBAgEAME0GA1UdHwRGMEQwQqBAoD6GPGh0dHA6Ly9nc21hLWNybC5zeW1h +dXRoLmNvbS9vZmZsaW5lY2EvZ3NtYS1yc3AyLXJvb3QtY2kxLmNybDAdBgNVHQ4E +FgQUgTcPUSXQsdQI1MOyMubSXnlb6/swCgYIKoZIzj0EAwIDSAAwRQIgIJdYsOMF +WziPK7l8nh5mu0qiRiVf25oa9ullG/OIASwCIQDqCmDrYf+GziHXBOiwJwnBaeBO +aFsiLzIEOaUuZwdNUw== +-----END CERTIFICATE----- +issuer= + countryName = US + organizationName = Entrust, Inc. + organizationalUnitName = See www.entrust.net/legal-terms + organizationalUnitName = (c) 2016 Entrust, Inc. - for authorized use only + commonName = Entrust eSIM Certification Authority +notBefore=2016-11-16 16:04:02Z +notAfter=2051-10-16 16:34:02Z +-----BEGIN CERTIFICATE----- +MIIC6DCCAo2gAwIBAgIRAIy4GT7M5nHsAAAAAFgsinowCgYIKoZIzj0EAwIwgbkx +CzAJBgNVBAYTAlVTMRYwFAYDVQQKEw1FbnRydXN0LCBJbmMuMSgwJgYDVQQLEx9T +ZWUgd3d3LmVudHJ1c3QubmV0L2xlZ2FsLXRlcm1zMTkwNwYDVQQLEzAoYykgMjAx +NiBFbnRydXN0LCBJbmMuIC0gZm9yIGF1dGhvcml6ZWQgdXNlIG9ubHkxLTArBgNV +BAMTJEVudHJ1c3QgZVNJTSBDZXJ0aWZpY2F0aW9uIEF1dGhvcml0eTAgFw0xNjEx +MTYxNjA0MDJaGA8yMDUxMTAxNjE2MzQwMlowgbkxCzAJBgNVBAYTAlVTMRYwFAYD +VQQKEw1FbnRydXN0LCBJbmMuMSgwJgYDVQQLEx9TZWUgd3d3LmVudHJ1c3QubmV0 +L2xlZ2FsLXRlcm1zMTkwNwYDVQQLEzAoYykgMjAxNiBFbnRydXN0LCBJbmMuIC0g +Zm9yIGF1dGhvcml6ZWQgdXNlIG9ubHkxLTArBgNVBAMTJEVudHJ1c3QgZVNJTSBD +ZXJ0aWZpY2F0aW9uIEF1dGhvcml0eTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IA +BAdzwGHeQ1Wb2f4DmHTByR5/IWL3JugQ1U3908a++bHdlt+TTA7K4c5cYZ+51Yz/ +hg/bacxguPDh9uQUK6Wg3a6jcjBwMA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/ +BAQDAgEGMBcGA1UdIAEB/wQNMAswCQYHZ4ESAQIBADAVBgNVHREEDjAMiApghkgB +hvpsFAoAMB0GA1UdDgQWBBQWcEt/NR42B/GMS3AAXDoAPf1BSjAKBggqhkjOPQQD +AgNJADBGAiEAspjXMvaBZyAg86Z0AAtT0yBRAi1EyaAfNz9kDJeAE04CIQC3efj8 +ATL7/tDBOhANy3cK8PS/1NIlu9vqMLCZsZvJ0Q== +-----END CERTIFICATE----- +issuer= + countryName = FR + organizationName = OBERTHUR TECHNOLOGIES + organizationalUnitName = TELECOM + commonName = MC4 OT ROOT CI v1 +notBefore=2016-11-15 00:00:01Z +notAfter=2046-11-08 23:59:59Z +-----BEGIN CERTIFICATE----- +MIICOjCCAeGgAwIBAgIBATAKBggqhkjOPQQDAjBbMQswCQYDVQQGEwJGUjEeMBwG +A1UEChMVT0JFUlRIVVIgVEVDSE5PTE9HSUVTMRAwDgYDVQQLEwdURUxFQ09NMRow +GAYDVQQDExFNQzQgT1QgUk9PVCBDSSB2MTAeFw0xNjExMTUwMDAwMDFaFw00NjEx +MDgyMzU5NTlaMFsxCzAJBgNVBAYTAkZSMR4wHAYDVQQKExVPQkVSVEhVUiBURUNI +Tk9MT0dJRVMxEDAOBgNVBAsTB1RFTEVDT00xGjAYBgNVBAMTEU1DNCBPVCBST09U +IENJIHYxMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEHb/Gajt3OZxuaDSklBQE +D4lOd6PGPLSvtfkM952ubdyy45tJwAeA0eEii0CLrFT6tcfXkW+H/5mQyMRXaAUk +T6OBlTCBkjAfBgNVHSMEGDAWgBTNbmC3LXoGPLyEYluR6A/jBAbhPjAdBgNVHQ4E +FgQUzW5gty16Bjy8hGJbkegP4wQG4T4wDgYDVR0PAQH/BAQDAgAGMBcGA1UdIAEB +/wQNMAswCQYHZ4ESAQIBADAWBgNVHREEDzANiAsrBgEEAYHvb7OITTAPBgNVHRMB +Af8EBTADAQH/MAoGCCqGSM49BAMCA0cAMEQCIEw4Nc7f2fDtoH+6ON/bknfDQxmT +ikThXjhpLtSrSKN2AiAxHxgC87L0FDnH8dJNlkdGX9c0JIx6oLheIplfS6k+jg== +-----END CERTIFICATE----- +issuer= + commonName = SubMan V4.2 CI Google Pixel + organizationName = Giesecke and Devrient GmbH + organizationalUnitName = Mobile Security + countryName = DE +notBefore=2017-05-10 00:00:00Z +notAfter=2027-05-10 00:00:00Z +-----BEGIN CERTIFICATE----- +MIICaTCCAg6gAwIBAgICASwwCgYIKoZIzj0EAwIwczElMCMGA1UEAxMcIFN1Yk1h +biBWNC4yIENJIEdvb2dsZSBQaXhlbDEjMCEGA1UEChMaR2llc2Vja2UgYW5kIERl +dnJpZW50IEdtYkgxGDAWBgNVBAsTD01vYmlsZSBTZWN1cml0eTELMAkGA1UEBhMC +REUwHhcNMTcwNTEwMDAwMDAwWhcNMjcwNTEwMDAwMDAwWjBzMSUwIwYDVQQDExwg +U3ViTWFuIFY0LjIgQ0kgR29vZ2xlIFBpeGVsMSMwIQYDVQQKExpHaWVzZWNrZSBh +bmQgRGV2cmllbnQgR21iSDEYMBYGA1UECxMPTW9iaWxlIFNlY3VyaXR5MQswCQYD +VQQGEwJERTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABHNorfaJsGzqWNawyAhl +IAv9QL2/+b9RsUoso06t/dKX1MRr5CUJ51acvv5TAFhQKIml+dwLbFnV5aO+8W6Z +wxajgZEwgY4wHwYDVR0jBBgwFoAUtg8LiX/WMLiM/tYWH46oCMU4KsMwHQYDVR0O +BBYEFLYPC4l/1jC4jP7WFh+OqAjFOCrDMA4GA1UdDwEB/wQEAwIBBjAXBgNVHSAB +Af8EDTALMAkGB2eBEgECAQAwDwYDVR0TAQH/BAUwAwEB/zASBgNVHREECzAJiAcr +BgEEAdwPMAoGCCqGSM49BAMCA0kAMEYCIQDpoZcuAQrjATW8U+AWqMUJ0dY6nWW1 +R1QmFzVZ1yMXSwIhALCvRqkCtgiavdeFeSgsSNbY5Fhd+QoCltuSh1U4TE7A +-----END CERTIFICATE----- +issuer= + countryName = DE + commonName = SubMan V4.2 CI + organizationName = Giesecke and Devrient + organizationalUnitName = Mobile Security +notBefore=2016-08-12 13:51:48Z +notAfter=2026-08-12 13:51:48Z +-----BEGIN CERTIFICATE----- +MIICUjCCAfigAwIBAgIDQgAAMAoGCCqGSM49BAMCMGAxCzAJBgNVBAYTAkRFMRcw +FQYDVQQDEw5TdWJNYW4gVjQuMiBDSTEeMBwGA1UEChMVR2llc2Vja2UgYW5kIERl +dnJpZW50MRgwFgYDVQQLEw9Nb2JpbGUgU2VjdXJpdHkwHhcNMTYwODEyMTM1MTQ4 +WhcNMjYwODEyMTM1MTQ4WjBgMQswCQYDVQQGEwJERTEXMBUGA1UEAxMOU3ViTWFu +IFY0LjIgQ0kxHjAcBgNVBAoTFUdpZXNlY2tlIGFuZCBEZXZyaWVudDEYMBYGA1UE +CxMPTW9iaWxlIFNlY3VyaXR5MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEYIgl +VQr9wbXOlwPp8qMg5Df08Cli9Mc+lpr3Lwa9PlVA3QWlLeX4GfD4H3phLBqVIa17 +yHttmtheTxi0KoEqhKOBoDCBnTAdBgNVHQ4EFgQU6lOt7zMpuVCa/XVf1Ei4LcG8 +7P8wDgYDVR0PAQH/BAQDAgEGMBcGA1UdIAEB/wQNMAswCQYHZ4ESAQIBADAPBgNV +HRMBAf8EBTADAQH/MBIGA1UdEQQLMAmIBysGAQQB3A8wLgYDVR0fBCcwJTAjoCGg +H4YdaHR0cDovL2dpLWRlLmNvbS90ZXN0LmNybC5wZW0wCgYIKoZIzj0EAwIDSAAw +RQIhAMMx2L/VHDiOW+Fl/OuFmhCdizYM17Yn9zAVieKO2T0iAiANWtCMmY+DzkqK +yHxBFX0U2tBd682zP4DpgRt8j3Ylew== +-----END CERTIFICATE----- diff --git a/system/hardware/tici/hardware.py b/system/hardware/tici/hardware.py index 15c6e416955efb..af5006c1e10b86 100644 --- a/system/hardware/tici/hardware.py +++ b/system/hardware/tici/hardware.py @@ -511,9 +511,9 @@ def configure_modem(self): def reboot_modem(self): modem = self.get_modem() - for state in (0, 1): + for cmd in ('AT+CFUN=0', 'AT+CFUN=1,1'): try: - modem.Command(f'AT+CFUN={state}', math.ceil(TIMEOUT), dbus_interface=MM_MODEM, timeout=TIMEOUT) + modem.Command(cmd, math.ceil(TIMEOUT), dbus_interface=MM_MODEM, timeout=TIMEOUT) except Exception: pass diff --git a/system/hardware/tici/lpa.py b/system/hardware/tici/lpa.py index 2e7e6a0ba97366..eccd6aa3ad55cb 100644 --- a/system/hardware/tici/lpa.py +++ b/system/hardware/tici/lpa.py @@ -2,55 +2,143 @@ import atexit import base64 +import fcntl +import hashlib import math import os +import requests import serial +import subprocess import sys +import termios +import time -from collections.abc import Generator +from collections.abc import Callable, Generator +from contextlib import contextmanager +from typing import Any +from pathlib import Path -from openpilot.system.hardware.base import LPABase, Profile +from openpilot.common.time_helpers import system_time_valid +from openpilot.common.utils import retry +from openpilot.system.hardware.base import LPABase, LPAError, Profile +GSMA_CI_BUNDLE = str(Path(__file__).parent / 'gsma_ci_bundle.pem') -DEFAULT_DEVICE = "/dev/ttyUSB2" + +DEFAULT_DEVICE = "/dev/modem_at0" DEFAULT_BAUD = 9600 DEFAULT_TIMEOUT = 5.0 # https://euicc-manual.osmocom.org/docs/lpa/applet-id/ ISDR_AID = "A0000005591010FFFFFFFF8900000100" MM = "org.freedesktop.ModemManager1" MM_MODEM = MM + ".Modem" +MM_DEVICE_UID = '/sys/devices/platform/soc/a800000.ssusb/a800000.dwc3/xhci-hcd.0.auto/usb1/1-1' ES10X_MSS = 120 -DEBUG = os.environ.get("DEBUG") == "1" +LOCK_FILE = '/tmp/.lpa.lock' +DEBUG = True # TLV Tags TAG_ICCID = 0x5A +TAG_STATUS = 0x80 TAG_PROFILE_INFO_LIST = 0xBF2D +TAG_LIST_NOTIFICATION = 0xBF28 +TAG_RETRIEVE_NOTIFICATION = 0xBF2B +TAG_NOTIFICATION_METADATA = 0xBF2F +TAG_NOTIFICATION_SENT = 0xBF30 +TAG_ENABLE_PROFILE = 0xBF31 +TAG_DELETE_PROFILE = 0xBF33 +TAG_EUICC_INFO = 0xBF20 +TAG_PREPARE_DOWNLOAD = 0xBF21 +TAG_EUICC_CHALLENGE = 0xBF2E +TAG_SET_NICKNAME = 0xBF29 +TAG_PROFILE_INSTALL_RESULT = 0xBF37 +TAG_BPP = 0xBF36 +TAG_AUTH_SERVER = 0xBF38 +TAG_CANCEL_SESSION = 0xBF41 +TAG_OK = 0xA0 + +PROFILE_ERROR_CODES = { + 0x01: "iccidOrAidNotFound", 0x02: "profileNotInDisabledState", + 0x03: "disallowedByPolicy", 0x04: "wrongProfileReenabling", + 0x05: "catBusy", 0x06: "undefinedError", +} + +AUTH_SERVER_ERROR_CODES = { + 0x01: "eUICCVerificationFailed", 0x02: "eUICCCertificateExpired", + 0x03: "eUICCCertificateRevoked", 0x05: "invalidServerSignature", + 0x06: "euiccCiPKUnknown", 0x0A: "matchingIdRefused", + 0x10: "insufficientMemory", +} +BPP_COMMAND_NAMES = { + 0: "initialiseSecureChannel", 1: "configureISDP", 2: "storeMetadata", + 3: "storeMetadata2", 4: "replaceSessionKeys", 5: "loadProfileElements", +} +BPP_ERROR_REASONS = { + 1: "incorrectInputValues", 2: "invalidSignature", 3: "invalidTransactionId", + 4: "unsupportedCrtValues", 5: "unsupportedRemoteOperationType", + 6: "unsupportedProfileClass", 7: "scp03tStructureError", 8: "scp03tSecurityError", + 9: "iccidAlreadyExistsOnEuicc", 10: "insufficientMemoryForProfile", + 11: "installInterrupted", 12: "peProcessingError", 13: "dataMismatch", + 14: "invalidNAA", +} +BPP_ERROR_MESSAGES = { + 9: "This eSIM profile is already installed on this device.", + 10: "Not enough memory on the eUICC to install this profile.", + 12: "Profile installation failed. The QR code may have already been used.", +} + +# SGP.22 §5.2.6 — SM-DP+ reason/subject codes mapped to user-friendly messages +# https://www.gsma.com/solutions-and-impact/technologies/esim/wp-content/uploads/2021/07/SGP.22-v2.3.pdf +ES9P_ERROR_MESSAGES: dict[tuple[str, str], str] = { + ('3.8', '8.2.6'): "This eSIM profile is already installed on another device. Please use a new QR code.", + ('3.8', '8.2.1'): "This eSIM profile has expired. Please request a new QR code.", + ('3.8', '8.1'): "The SM-DP+ server refused this request.", + ('3.1', '8.2.6'): "This eSIM profile has been revoked by the carrier.", + ('3.9', '8.2.6'): "This eSIM profile download has already been completed.", + ('2.1', '8.8'): "The device is not compatible with this eSIM profile.", + ('1.2', '8.1'): "The SM-DP+ server is temporarily unavailable. Try again later.", +} STATE_LABELS = {0: "disabled", 1: "enabled", 255: "unknown"} ICON_LABELS = {0: "jpeg", 1: "png", 255: "unknown"} CLASS_LABELS = {0: "test", 1: "provisioning", 2: "operational", 255: "unknown"} +# TLV tag -> (field_name, decoder) +FieldMap = dict[int, tuple[str, Callable[[bytes], Any]]] + def b64e(data: bytes) -> str: return base64.b64encode(data).decode("ascii") +def base64_trim(s: str) -> str: + return "".join(c for c in s if c not in "\n\r \t") + + +def b64d(s: str) -> bytes: + return base64.b64decode(base64_trim(s)) + + class AtClient: def __init__(self, device: str, baud: int, timeout: float, debug: bool) -> None: self.debug = debug self.channel: str | None = None + self._device = device + self._baud = baud self._timeout = timeout self._serial: serial.Serial | None = None - try: - self._serial = serial.Serial(device, baudrate=baud, timeout=timeout) - self._serial.reset_input_buffer() - except (serial.SerialException, PermissionError, OSError): - pass + self._use_dbus = not os.path.exists(device) + if self.debug: + transport = "DBUS" if self._use_dbus else "serial" + print(f"AtClient: using {transport} transport", file=sys.stderr) def close(self) -> None: try: if self.channel: - self.query(f"AT+CCHC={self.channel}") + try: + self.query(f"AT+CCHC={self.channel}") + except (RuntimeError, TimeoutError): + pass self.channel = None finally: if self._serial: @@ -78,6 +166,15 @@ def _expect(self) -> list[str]: raise RuntimeError(f"AT command failed: {line}") lines.append(line) + def _reconnect_serial(self) -> None: + self.channel = None + try: + if self._serial: + self._serial.close() + except Exception: + pass + self._serial = serial.Serial(self._device, baudrate=self._baud, timeout=self._timeout) + def _get_modem(self): import dbus bus = dbus.SystemBus() @@ -100,35 +197,74 @@ def _dbus_query(self, cmd: str) -> list[str]: return lines def query(self, cmd: str) -> list[str]: - if self._serial: + if self._use_dbus: + return self._dbus_query(cmd) + if not self._serial: + self._serial = serial.Serial(self._device, baudrate=self._baud, timeout=self._timeout) + try: + self._send(cmd) + return self._expect() + except serial.SerialException: + self._reconnect_serial() self._send(cmd) return self._expect() - return self._dbus_query(cmd) - def open_isdr(self) -> None: - # close any stale logical channel from a previous crashed session - try: - self.query("AT+CCHC=1") - except RuntimeError: - pass + def _open_isdr_once(self) -> None: + if self.channel: + try: + self.query(f"AT+CCHC={self.channel}") + except RuntimeError: + pass + self.channel = None + # drain any unsolicited responses before opening + if self._serial and not self._use_dbus: + try: + self._serial.reset_input_buffer() + except (OSError, serial.SerialException, termios.error): + self._reconnect_serial() for line in self.query(f'AT+CCHO="{ISDR_AID}"'): if line.startswith("+CCHO:") and (ch := line.split(":", 1)[1].strip()): self.channel = ch return raise RuntimeError("Failed to open ISD-R application") - def send_apdu(self, apdu: bytes) -> tuple[bytes, int, int]: - if not self.channel: - raise RuntimeError("Logical channel is not open") - hex_payload = apdu.hex().upper() - for line in self.query(f'AT+CGLA={self.channel},{len(hex_payload)},"{hex_payload}"'): - if line.startswith("+CGLA:"): - parts = line.split(":", 1)[1].split(",", 1) - if len(parts) == 2: - data = bytes.fromhex(parts[1].strip().strip('"')) - if len(data) >= 2: - return data[:-2], data[-2], data[-1] - raise RuntimeError("Missing +CGLA response") + def open_isdr(self) -> None: + for attempt in range(10): + try: + self._open_isdr_once() + return + except (RuntimeError, TimeoutError, termios.error) as e: + if self.debug: + print(f"open_isdr failed, trying again", file=sys.stderr) + if attempt == 3: + # SIM may be stuck (CME ERROR 13) — reset modem via lte.sh + subprocess.run(['/usr/comma/lte/lte.sh', 'start'], capture_output=True) + time.sleep(5) + self._reconnect_serial() + else: + time.sleep(2.0) + raise RuntimeError("Failed to open ISD-R after retries") + + def send_apdu(self, apdu: bytes, max_retries: int = 3) -> tuple[bytes, int, int]: + for attempt in range(max_retries): + try: + if not self.channel: + self.open_isdr() + hex_payload = apdu.hex().upper() + for line in self.query(f'AT+CGLA={self.channel},{len(hex_payload)},"{hex_payload}"'): + if line.startswith("+CGLA:"): + parts = line.split(":", 1)[1].split(",", 1) + if len(parts) == 2: + data = bytes.fromhex(parts[1].strip().strip('"')) + if len(data) >= 2: + return data[:-2], data[-2], data[-1] + raise RuntimeError("Missing +CGLA response") + except (RuntimeError, ValueError): + self.channel = None + if attempt == max_retries - 1: + raise + time.sleep(1 + attempt) + raise RuntimeError("send_apdu failed") # --- TLV utilities --- @@ -170,12 +306,37 @@ def find_tag(data: bytes, target: int) -> bytes | None: return next((v for t, v in iter_tlv(data) if t == target), None) +def require_tag(data: bytes, target: int, label: str = "") -> bytes: + v = find_tag(data, target) + if v is None: + raise RuntimeError(f"Missing {label or f'tag 0x{target:X}'}") + return v + + def tbcd_to_string(raw: bytes) -> str: return "".join(str(n) for b in raw for n in (b & 0x0F, b >> 4) if n <= 9) -# Profile field decoders: TLV tag -> (field_name, decoder) -_PROFILE_FIELDS = { +def string_to_tbcd(s: str) -> bytes: + digits = [int(c) for c in s if c.isdigit()] + return bytes(digits[i] | ((digits[i + 1] if i + 1 < len(digits) else 0xF) << 4) for i in range(0, len(digits), 2)) + + +def encode_tlv(tag: int, value: bytes) -> bytes: + tag_bytes = bytes([(tag >> 8) & 0xFF, tag & 0xFF]) if tag > 255 else bytes([tag]) + vlen = len(value) + if vlen <= 127: + return tag_bytes + bytes([vlen]) + value + length_bytes = vlen.to_bytes((vlen.bit_length() + 7) // 8, "big") + return tag_bytes + bytes([0x80 | len(length_bytes)]) + length_bytes + value + + +def int_bytes(n: int) -> bytes: + """Encode a positive integer as minimal big-endian bytes (at least 1 byte).""" + return n.to_bytes((n.bit_length() + 7) // 8 or 1, "big") + + +PROFILE: FieldMap = { TAG_ICCID: ("iccid", tbcd_to_string), 0x4F: ("isdpAid", lambda v: v.hex().upper()), 0x9F70: ("profileState", lambda v: STATE_LABELS.get(v[0], "unknown")), @@ -187,12 +348,19 @@ def tbcd_to_string(raw: bytes) -> str: 0x95: ("profileClass", lambda v: CLASS_LABELS.get(v[0], "unknown")), } +NOTIFICATION: FieldMap = { + TAG_STATUS: ("seqNumber", lambda v: int.from_bytes(v, "big")), + 0x81: ("profileManagementOperation", lambda v: next((m for m in [0x80, 0x40, 0x20, 0x10] if len(v) >= 2 and v[1] & m), 0xFF)), + 0x0C: ("notificationAddress", lambda v: v.decode("utf-8", errors="ignore")), + TAG_ICCID: ("iccid", tbcd_to_string), +} + -def _decode_profile_fields(data: bytes) -> dict: - """Parse known profile metadata TLV fields into a dict.""" - result = {} +def decode_struct(data: bytes, field_map: FieldMap) -> dict[str, Any]: + """Parse TLV data using a {tag: (field_name, decoder)} map into a dict.""" + result: dict[str, Any] = {name: None for name, _ in field_map.values()} for tag, value in iter_tlv(data): - if (field := _PROFILE_FIELDS.get(tag)): + if (field := field_map.get(tag)): result[field[0]] = field[1](value) return result @@ -225,57 +393,395 @@ def es10x_command(client: AtClient, data: bytes) -> bytes: # --- Profile operations --- def decode_profiles(blob: bytes) -> list[dict]: - root = find_tag(blob, TAG_PROFILE_INFO_LIST) - if root is None: - raise RuntimeError("Missing ProfileInfoList") - list_ok = find_tag(root, 0xA0) + root = require_tag(blob, TAG_PROFILE_INFO_LIST, "ProfileInfoList") + list_ok = find_tag(root, TAG_OK) if list_ok is None: return [] - defaults = {name: None for name, _ in _PROFILE_FIELDS.values()} - return [{**defaults, **_decode_profile_fields(value)} for tag, value in iter_tlv(list_ok) if tag == 0xE3] + return [decode_struct(value, PROFILE) for tag, value in iter_tlv(list_ok) if tag == 0xE3] def list_profiles(client: AtClient) -> list[dict]: return decode_profiles(es10x_command(client, TAG_PROFILE_INFO_LIST.to_bytes(2, "big") + b"\x00")) -class TiciLPA(LPABase): - _instance = None +# --- ES9P HTTP --- + +def es9p_request(smdp_address: str, endpoint: str, payload: dict, error_prefix: str = "Request") -> dict: + if not system_time_valid(): + raise RuntimeError("System time is not set; TLS certificate validation requires a valid clock") + url = f"https://{smdp_address}/gsma/rsp2/es9plus/{endpoint}" + headers = {"User-Agent": "gsma-rsp-lpad", "X-Admin-Protocol": "gsma/rsp/v2.3.0", "Content-Type": "application/json"} + resp = requests.post(url, json=payload, headers=headers, timeout=30, verify=GSMA_CI_BUNDLE) + resp.raise_for_status() + if not resp.content: + return {} + data = resp.json() + if "header" in data and "functionExecutionStatus" in data["header"]: + status = data["header"]["functionExecutionStatus"] + if status.get("status") == "Failed": + sd = status.get("statusCodeData", {}) + reason = sd.get('reasonCode', 'unknown') + subject = sd.get('subjectCode', 'unknown') + msg = ES9P_ERROR_MESSAGES.get((reason, subject), + f"{error_prefix} failed: {reason}/{subject} - {sd.get('message', 'unknown')}") + raise RuntimeError(msg) + return data + + +# --- Notifications --- + +def list_notifications(client: AtClient) -> list[dict]: + response = es10x_command(client, encode_tlv(TAG_LIST_NOTIFICATION, b"")) + root = require_tag(response, TAG_LIST_NOTIFICATION, "ListNotificationResponse") + metadata_list = find_tag(root, TAG_OK) + if metadata_list is None: + return [] + notifications: list[dict] = [] + for tag, value in iter_tlv(metadata_list): + if tag != TAG_NOTIFICATION_METADATA: + continue + notification = decode_struct(value, NOTIFICATION) + if notification["seqNumber"] is not None and notification["profileManagementOperation"] is not None and notification["notificationAddress"]: + notifications.append(notification) + return notifications + + +def process_notifications(client: AtClient) -> None: + for notification in list_notifications(client): + seq_number, smdp_address = notification["seqNumber"], notification["notificationAddress"] + try: + # retrieve notification + request = encode_tlv(TAG_RETRIEVE_NOTIFICATION, encode_tlv(TAG_OK, encode_tlv(TAG_STATUS, int_bytes(seq_number)))) + response = es10x_command(client, request) + content = require_tag(require_tag(response, TAG_RETRIEVE_NOTIFICATION, "RetrieveNotificationsListResponse"), + TAG_OK, "RetrieveNotificationsListResponse") + pending_notif = next((v for t, v in iter_tlv(content) if t in (TAG_PROFILE_INSTALL_RESULT, 0x30)), None) + if pending_notif is None: + raise RuntimeError("Missing PendingNotification") + + # send to SM-DP+ + es9p_request(smdp_address, "handleNotification", {"pendingNotification": b64e(pending_notif)}, "HandleNotification") + + # remove notification + response = es10x_command(client, encode_tlv(TAG_NOTIFICATION_SENT, encode_tlv(TAG_STATUS, int_bytes(seq_number)))) + root = require_tag(response, TAG_NOTIFICATION_SENT, "NotificationSentResponse") + if int.from_bytes(require_tag(root, TAG_STATUS, "RemoveNotificationFromList status"), "big") != 0: + raise RuntimeError("RemoveNotificationFromList failed") + except Exception: + pass - def __new__(cls): - if cls._instance is None: - cls._instance = super().__new__(cls) - return cls._instance + +# --- Authentication & Download --- + +def get_challenge_and_info(client: AtClient) -> tuple[bytes, bytes]: + challenge_resp = es10x_command(client, encode_tlv(TAG_EUICC_CHALLENGE, b"")) + challenge = require_tag(require_tag(challenge_resp, TAG_EUICC_CHALLENGE, "GetEuiccDataResponse"), + TAG_STATUS, "challenge in response") + info_resp = es10x_command(client, encode_tlv(TAG_EUICC_INFO, b"")) + if not info_resp.startswith(bytes([0xBF, 0x20])): + raise RuntimeError("Missing GetEuiccInfo1Response") + return challenge, info_resp + + +def authenticate_server(client: AtClient, b64_signed1: str, b64_sig1: str, b64_pk_id: str, b64_cert: str, matching_id: str | None = None) -> str: + tac = bytes([0x35, 0x29, 0x06, 0x11]) + device_info = encode_tlv(TAG_STATUS, tac) + encode_tlv(0xA1, b"") + ctx_inner = b"" + if matching_id: + ctx_inner += encode_tlv(TAG_STATUS, matching_id.encode("utf-8")) + ctx_inner += encode_tlv(0xA1, device_info) + content = base64.b64decode(b64_signed1) + base64.b64decode(b64_sig1) + base64.b64decode(b64_pk_id) + base64.b64decode(b64_cert) + encode_tlv(0xA0, ctx_inner) + response = es10x_command(client, encode_tlv(TAG_AUTH_SERVER, content)) + if not response.startswith(bytes([0xBF, 0x38])): + raise RuntimeError("Invalid AuthenticateServerResponse") + root = find_tag(response, TAG_AUTH_SERVER) + if root is not None: + error_tag = find_tag(root, 0xA1) + if error_tag is not None: + code = int.from_bytes(error_tag, "big") if error_tag else 0 + raise RuntimeError(f"AuthenticateServer rejected by eUICC: {AUTH_SERVER_ERROR_CODES.get(code, 'unknown')} (0x{code:02X})") + return b64e(response) + + +def prepare_download(client: AtClient, b64_signed2: str, b64_sig2: str, b64_cert: str, cc: str | None = None) -> str: + smdp_signed2 = base64.b64decode(b64_signed2) + smdp_signature2 = base64.b64decode(b64_sig2) + smdp_certificate = base64.b64decode(b64_cert) + smdp_signed2_root = find_tag(smdp_signed2, 0x30) + if smdp_signed2_root is None: + raise RuntimeError("Invalid smdpSigned2") + transaction_id = find_tag(smdp_signed2_root, TAG_STATUS) + cc_required_flag = find_tag(smdp_signed2_root, 0x01) + if transaction_id is None or cc_required_flag is None: + raise RuntimeError("Invalid smdpSigned2") + content = smdp_signed2 + smdp_signature2 + if int.from_bytes(cc_required_flag, "big") != 0: + if not cc: + raise RuntimeError("Confirmation code required but not provided") + content += encode_tlv(0x04, hashlib.sha256(hashlib.sha256(cc.encode("utf-8")).digest() + transaction_id).digest()) + content += smdp_certificate + response = es10x_command(client, encode_tlv(TAG_PREPARE_DOWNLOAD, content)) + if not response.startswith(bytes([0xBF, 0x21])): + raise RuntimeError("Invalid PrepareDownloadResponse") + return b64e(response) + + +def _parse_tlv_header_len(data: bytes) -> int: + tag_len = 2 if data[0] & 0x1F == 0x1F else 1 + length_byte = data[tag_len] + return tag_len + (1 + (length_byte & 0x7F) if length_byte & 0x80 else 1) + + +def load_bpp(client: AtClient, b64_bpp: str) -> dict: + bpp = b64d(b64_bpp) + if not bpp.startswith(bytes([0xBF, 0x36])): + raise RuntimeError("Invalid BoundProfilePackage") + + bpp_root_value = None + for tag, value, start, end in iter_tlv(bpp, with_positions=True): + if tag == TAG_BPP: + bpp_root_value = value + bpp_value_start = start + _parse_tlv_header_len(bpp[start:end]) + break + if bpp_root_value is None: + raise RuntimeError("Invalid BoundProfilePackage") + + chunks: list[bytes] = [] + for tag, value, start, end in iter_tlv(bpp_root_value, with_positions=True): + if tag == 0xBF23: + chunks.append(bpp[0 : bpp_value_start + end]) + elif tag == 0xA0: + chunks.append(bpp[bpp_value_start + start : bpp_value_start + end]) + elif tag in (0xA1, 0xA3): + hdr_len = _parse_tlv_header_len(bpp_root_value[start:end]) + chunks.append(bpp[bpp_value_start + start : bpp_value_start + start + hdr_len]) + for _, _, child_start, child_end in iter_tlv(value, with_positions=True): + chunks.append(value[child_start:child_end]) + elif tag == 0xA2: + chunks.append(bpp[bpp_value_start + start : bpp_value_start + end]) + + result: dict[str, Any] = {"seqNumber": 0, "success": False, "bppCommandId": None, "errorReason": None} + for chunk in chunks: + response = es10x_command(client, chunk) + if not response: + continue + root = find_tag(response, TAG_PROFILE_INSTALL_RESULT) + if not root: + continue + result_data = find_tag(root, 0xBF27) + if not result_data: + break + notif_meta = find_tag(result_data, TAG_NOTIFICATION_METADATA) + if notif_meta: + seq_num = find_tag(notif_meta, TAG_STATUS) + if seq_num: + result["seqNumber"] = int.from_bytes(seq_num, "big") + final_result = find_tag(result_data, 0xA2) + if final_result: + for tag, value in iter_tlv(final_result): + if tag == 0xA0: + result["success"] = True + elif tag == 0xA1: + bpp_cmd = find_tag(value, TAG_STATUS) + if bpp_cmd: + result["bppCommandId"] = int.from_bytes(bpp_cmd, "big") + err = find_tag(value, 0x81) + if err: + result["errorReason"] = int.from_bytes(err, "big") + break + if not result["success"] and result["errorReason"] is not None: + msg = BPP_ERROR_MESSAGES.get(result["errorReason"]) + if not msg: + cmd_name = BPP_COMMAND_NAMES.get(result["bppCommandId"], f"unknown({result['bppCommandId']})") + err_name = BPP_ERROR_REASONS.get(result["errorReason"], f"unknown({result['errorReason']})") + msg = f"Profile installation failed at {cmd_name}: {err_name}" + raise RuntimeError(msg) + return result + + +def parse_metadata(b64_metadata: str) -> dict: + root = find_tag(b64d(b64_metadata), 0xBF25) + if root is None: + raise RuntimeError("Invalid profileMetadata") + return decode_struct(root, PROFILE) + + +def cancel_session(client: AtClient, transaction_id: bytes, reason: int = 127) -> str: + content = encode_tlv(0x80, transaction_id) + encode_tlv(0x81, bytes([reason])) + response = es10x_command(client, encode_tlv(TAG_CANCEL_SESSION, content)) + return b64e(response) + + +def parse_lpa_activation_code(activation_code: str) -> tuple[str, str, str]: + if not activation_code.startswith("LPA:"): + raise ValueError("Invalid activation code format") + parts = activation_code[4:].split("$") + if len(parts) != 3: + raise ValueError("Invalid activation code format") + return parts[0], parts[1], parts[2] + + +def download_profile(client: AtClient, activation_code: str) -> str: + """Download and install an eSIM profile. Returns the ICCID of the installed profile.""" + _, smdp, matching_id = parse_lpa_activation_code(activation_code) + + challenge, euicc_info = get_challenge_and_info(client) + + payload: dict[str, str] = {"smdpAddress": smdp, "euiccChallenge": b64e(challenge), "euiccInfo1": b64e(euicc_info)} + if matching_id: + payload["matchingId"] = matching_id + auth = es9p_request(smdp, "initiateAuthentication", payload, "Authentication") + tx_id = base64_trim(auth.get("transactionId", "")) + tx_id_bytes = base64.b64decode(tx_id) if tx_id else b"" + + try: + b64_auth_resp = authenticate_server( + client, base64_trim(auth.get("serverSigned1", "")), base64_trim(auth.get("serverSignature1", "")), + base64_trim(auth.get("euiccCiPKIdToBeUsed", "")), base64_trim(auth.get("serverCertificate", "")), + matching_id=matching_id) + + cli = es9p_request(smdp, "authenticateClient", {"transactionId": tx_id, "authenticateServerResponse": b64_auth_resp}, "Authentication") + metadata = parse_metadata(base64_trim(cli.get("profileMetadata", ""))) + iccid = metadata.get("iccid", "") + + b64_prep = prepare_download( + client, base64_trim(cli.get("smdpSigned2", "")), base64_trim(cli.get("smdpSignature2", "")), base64_trim(cli.get("smdpCertificate", ""))) + + bpp = es9p_request(smdp, "getBoundProfilePackage", {"transactionId": tx_id, "prepareDownloadResponse": b64_prep}, "GetBoundProfilePackage") + + result = load_bpp(client, base64_trim(bpp.get("boundProfilePackage", ""))) + if not result["success"]: + raise RuntimeError(f"Profile installation failed: {result}") + return iccid + except Exception: + if tx_id_bytes: + b64_cancel_resp = "" + try: + b64_cancel_resp = cancel_session(client, tx_id_bytes) + except Exception: + pass + try: + es9p_request(smdp, "cancelSession", {"transactionId": tx_id, "cancelSessionResponse": b64_cancel_resp}, "CancelSession") + except Exception: + pass + raise + + +def set_profile_nickname(client: AtClient, iccid: str, nickname: str) -> None: + nickname_bytes = nickname.encode("utf-8") + if len(nickname_bytes) > 64: + raise ValueError("Profile nickname must be 64 bytes or less") + content = encode_tlv(TAG_ICCID, string_to_tbcd(iccid)) + encode_tlv(0x90, nickname_bytes) + response = es10x_command(client, encode_tlv(TAG_SET_NICKNAME, content)) + root = require_tag(response, TAG_SET_NICKNAME, "SetNicknameResponse") + code = require_tag(root, TAG_STATUS, "status in SetNicknameResponse")[0] + if code == 0x01: + raise LPAError(f"profile {iccid} not found") + if code != 0x00: + raise RuntimeError(f"SetNickname failed with status 0x{code:02X}") + + + +class TiciLPA(LPABase): def __init__(self): - if hasattr(self, '_client'): - return self._client = AtClient(DEFAULT_DEVICE, DEFAULT_BAUD, DEFAULT_TIMEOUT, debug=DEBUG) - self._client.open_isdr() atexit.register(self._client.close) + @contextmanager + def _acquire_channel(self, inhibit: bool = False): + inhibit_proc = None + if inhibit: + inhibit_proc = subprocess.Popen(['sudo', 'mmcli', f'--inhibit-device={MM_DEVICE_UID}'], + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + time.sleep(0.5) + fd = os.open(LOCK_FILE, os.O_CREAT | os.O_RDWR) + try: + fcntl.flock(fd, fcntl.LOCK_EX) + self._client.open_isdr() + yield + finally: + if self._client.channel: + try: + self._client.query(f"AT+CCHC={self._client.channel}") + except (RuntimeError, TimeoutError): + pass + self._client.channel = None + fcntl.flock(fd, fcntl.LOCK_UN) + os.close(fd) + if inhibit_proc: + inhibit_proc.terminate() + inhibit_proc.wait(timeout=5) + def list_profiles(self) -> list[Profile]: - return [ - Profile( - iccid=p.get("iccid", ""), - nickname=p.get("profileNickname") or "", - enabled=p.get("profileState") == "enabled", - provider=p.get("serviceProviderName") or "", - ) - for p in list_profiles(self._client) - ] + with self._acquire_channel(): + return [ + Profile( + iccid=p.get("iccid", ""), + nickname=p.get("profileNickname") or "", + enabled=p.get("profileState") == "enabled", + provider=p.get("serviceProviderName") or "", + ) + for p in list_profiles(self._client) + ] def get_active_profile(self) -> Profile | None: return None + def process_notifications(self) -> None: + with self._acquire_channel(): + process_notifications(self._client) + def delete_profile(self, iccid: str) -> None: - return None + if self.is_comma_profile(iccid): + raise LPAError("refusing to delete a comma profile") + with self._acquire_channel(inhibit=True): + request = encode_tlv(TAG_DELETE_PROFILE, encode_tlv(TAG_ICCID, string_to_tbcd(iccid))) + response = es10x_command(self._client, request) + root = require_tag(response, TAG_DELETE_PROFILE, "DeleteProfileResponse") + code = require_tag(root, TAG_STATUS, "status in DeleteProfileResponse")[0] + if code != 0x00: + raise LPAError(f"DeleteProfile failed: {PROFILE_ERROR_CODES.get(code, 'unknown')} (0x{code:02X})") def download_profile(self, qr: str, nickname: str | None = None) -> None: - return None + with self._acquire_channel(): + iccid = download_profile(self._client, qr) + if nickname and iccid: + set_profile_nickname(self._client, iccid, nickname) def nickname_profile(self, iccid: str, nickname: str) -> None: - return None + with self._acquire_channel(): + set_profile_nickname(self._client, iccid, nickname) + + def _enable_profile(self, iccid: str, refresh: bool = False) -> int: + inner = encode_tlv(TAG_OK, encode_tlv(TAG_ICCID, string_to_tbcd(iccid))) + inner += b'\x01\x01\x01' if refresh else b'\x01\x01\x00' + response = es10x_command(self._client, encode_tlv(TAG_ENABLE_PROFILE, inner)) + root = require_tag(response, TAG_ENABLE_PROFILE, "EnableProfileResponse") + return require_tag(root, TAG_STATUS, "status in EnableProfileResponse")[0] + + def _has_sim_presence(self) -> bool: + from openpilot.system.hardware.tici.hardware import get_device_type + return get_device_type() == "tizi" def switch_profile(self, iccid: str) -> None: - return None + refresh = self._has_sim_presence() + with self._acquire_channel(inhibit=True): + code = self._enable_profile(iccid, refresh=refresh) + if code == 0x05: # catBusy — reset modem and retry + subprocess.run(['/usr/comma/lte/lte.sh', 'start'], capture_output=True) + time.sleep(5) + self._client._reconnect_serial() + self._client.open_isdr() + code = self._enable_profile(iccid, refresh=refresh) + if code not in (0x00, 0x02): + raise LPAError(f"EnableProfile failed: {PROFILE_ERROR_CODES.get(code, 'unknown')} (0x{code:02X})") + if code == 0x00: + if refresh: + # SIM toolkit refresh briefly disrupts the modem — wait for it to settle + time.sleep(1) + self._client._serial.reset_input_buffer() + else: + self._client._serial.write(b'AT+CFUN=0\rAT+CFUN=1\r') + time.sleep(2) + self._client._serial.reset_input_buffer() diff --git a/system/ui/lib/cellular_manager.py b/system/ui/lib/cellular_manager.py new file mode 100644 index 00000000000000..414dbd01dd8a42 --- /dev/null +++ b/system/ui/lib/cellular_manager.py @@ -0,0 +1,174 @@ +import subprocess +import time +import threading +from collections.abc import Callable + +from openpilot.common.swaglog import cloudlog +from openpilot.system.hardware.base import LPABase, Profile + +MODEM_IP_POLL_INTERVAL = 5.0 +DOWNLOAD_TIMEOUT = 120 # seconds + + +def _get_modem_ip() -> str: + for iface in ("ppp0", "wwan0"): + try: + out = subprocess.check_output(["ip", "-4", "-o", "addr", "show", iface], timeout=1, text=True, stderr=subprocess.DEVNULL) + parts = out.split() + for i, part in enumerate(parts): + if part == "inet" and i + 1 < len(parts): + return parts[i + 1].split("/")[0] + except Exception: + pass + return "" + + +LPA_RETRY_INTERVAL = 5.0 + + +def _get_lpa() -> LPABase: + from openpilot.system.hardware import HARDWARE + return HARDWARE.get_sim_lpa() + + +class CellularManager: + def __init__(self): + self._lpa: LPABase | None = None + self._profiles: list[Profile] = [] + self._busy: bool = False + self._switching_iccid: str | None = None + + self._lock = threading.Lock() + self._callback_queue: list[Callable] = [] + + self._profiles_updated_cbs: list[Callable[[list[Profile]], None]] = [] + self._operation_error_cbs: list[Callable[[str], None]] = [] + + self._modem_ip: str = _get_modem_ip() + self._last_ip_poll: float = 0.0 + + def add_callbacks(self, profiles_updated: Callable | None = None, operation_error: Callable | None = None): + if profiles_updated: + self._profiles_updated_cbs.append(profiles_updated) + if operation_error: + self._operation_error_cbs.append(operation_error) + + @property + def modem_ip(self) -> str: + return self._modem_ip + + def process_callbacks(self): + to_run, self._callback_queue = self._callback_queue, [] + for cb in to_run: + cb() + + now = time.monotonic() + if now - self._last_ip_poll >= MODEM_IP_POLL_INTERVAL: + self._last_ip_poll = now + self._modem_ip = _get_modem_ip() + + @property + def profiles(self) -> list[Profile]: + return self._profiles + + @property + def busy(self) -> bool: + return self._busy + + @property + def switching_iccid(self) -> str | None: + return self._switching_iccid + + def is_comma_profile(self, iccid: str) -> bool: + return any(p.iccid == iccid and p.provider == 'Webbing' for p in self._profiles) + + def _ensure_lpa(self) -> LPABase: + if self._lpa is None: + self._lpa = _get_lpa() + return self._lpa + + def _finish(self, profiles: list[Profile] | None = None, error: str | None = None): + self._busy = False + self._switching_iccid = None + if profiles is not None: + self._profiles = profiles + for cb in self._profiles_updated_cbs: + cb(profiles) + if error is not None: + for cb in self._operation_error_cbs: + cb(error) + + def _run_operation(self, fn: Callable, error_msg: str): + self._busy = True + + def worker(): + try: + with self._lock: + lpa = self._ensure_lpa() + fn(lpa) + profiles = lpa.list_profiles() + self._callback_queue.append(lambda: self._finish(profiles=profiles)) + except Exception as e: + cloudlog.exception(error_msg) + err = str(e) + self._callback_queue.append(lambda: self._finish(error=err)) + + threading.Thread(target=worker, daemon=True).start() + + def refresh_profiles(self): + def worker(): + try: + with self._lock: + lpa = self._ensure_lpa() + lpa.process_notifications() + profiles = lpa.list_profiles() + self._callback_queue.append(lambda: self._finish_refresh(profiles)) + except Exception: + cloudlog.exception("Failed to list eSIM profiles") + time.sleep(LPA_RETRY_INTERVAL) + self._callback_queue.append(lambda: self.refresh_profiles()) + + threading.Thread(target=worker, daemon=True).start() + + def _finish_refresh(self, profiles: list[Profile]): + if self._busy: + return + self._profiles = profiles + for cb in self._profiles_updated_cbs: + cb(profiles) + + def switch_profile(self, iccid: str): + self._switching_iccid = iccid + self._run_operation(lambda lpa: lpa.switch_profile(iccid), "Failed to switch eSIM profile") + + def delete_profile(self, iccid: str): + self._run_operation(lambda lpa: lpa.delete_profile(iccid), "Failed to delete eSIM profile") + + def download_profile(self, qr: str, nickname: str | None = None): + self._busy = True + + def worker(): + try: + with self._lock: + lpa = self._ensure_lpa() + lpa.download_profile(qr, nickname) + profiles = lpa.list_profiles() + self._callback_queue.append(lambda: self._finish(profiles=profiles)) + except Exception as e: + cloudlog.exception("Failed to download eSIM profile") + err = str(e) + self._callback_queue.append(lambda: self._finish(error=err)) + + t = threading.Thread(target=worker, daemon=True) + t.start() + + def watchdog(): + t.join(timeout=DOWNLOAD_TIMEOUT) + if t.is_alive(): + cloudlog.error("eSIM profile download timed out") + self._callback_queue.append(lambda: self._finish(error="Profile download timed out. Please try again.")) + + threading.Thread(target=watchdog, daemon=True).start() + + def nickname_profile(self, iccid: str, nickname: str): + self._run_operation(lambda lpa: lpa.nickname_profile(iccid, nickname), "Failed to update eSIM profile nickname") diff --git a/system/ui/widgets/esim.py b/system/ui/widgets/esim.py new file mode 100644 index 00000000000000..63d28c0f693e9c --- /dev/null +++ b/system/ui/widgets/esim.py @@ -0,0 +1,382 @@ +import threading +import urllib.request +from enum import IntEnum +from functools import partial + +import pyray as rl + +from openpilot.system.ui.lib.application import gui_app, FontWeight +from openpilot.system.ui.lib.cellular_manager import CellularManager +from openpilot.system.ui.lib.scroll_panel import GuiScrollPanel +from openpilot.system.hardware.base import Profile +from openpilot.system.ui.widgets import DialogResult, Widget +from openpilot.system.ui.widgets.button import ButtonStyle, Button +from openpilot.system.ui.widgets.confirm_dialog import ConfirmDialog, alert_dialog +from openpilot.system.ui.widgets.keyboard import Keyboard +from openpilot.system.ui.widgets.label import gui_label + +try: + import zxingcpp + from msgq.visionipc import VisionStreamType + from openpilot.selfdrive.ui.onroad.cameraview import CameraView + from openpilot.common.params import Params + from openpilot.selfdrive.ui.ui_state import device +except Exception: + zxingcpp = None + VisionStreamType = None + CameraView = None + Params = None + device = None + +ITEM_HEIGHT = 160 +ICON_SIZE = 50 +COMMA_ICON_SIZE = 40 +MAX_NICKNAME_LENGTH = 64 + + +class UIState(IntEnum): + IDLE = 0 + SWITCHING = 1 + DELETING = 2 + DOWNLOADING = 3 + + +def _profile_display_name(profile: Profile) -> str: + return profile.nickname or profile.provider or profile.iccid[:12] + + +def _is_valid_lpa_code(text: str) -> bool: + if not text.startswith("LPA:"): + return False + parts = text[4:].split("$") + return len(parts) == 3 and all(parts) + + +class QRScannerDialog(Widget): + def __init__(self, on_qr_detected): + super().__init__() + self._on_qr_detected = on_qr_detected + self._camera_view = CameraView("camerad", VisionStreamType.VISION_STREAM_DRIVER) if CameraView else None + self._detected = False + + def show_event(self): + super().show_event() + if Params: + Params().put_bool("IsDriverViewEnabled", True) + + def hide_event(self): + super().hide_event() + if Params: + Params().put_bool("IsDriverViewEnabled", False) + + def __del__(self): + if self._camera_view: + self._camera_view.close() + + def _update_state(self): + super()._update_state() + if self._camera_view: + self._camera_view._update_state() + + if self._detected or not self._camera_view or not self._camera_view.frame: + return + + frame = self._camera_view.frame + gray = frame.data[:frame.height * frame.stride].reshape(frame.height, frame.stride)[:, :frame.width] + results = zxingcpp.read_barcodes(gray) + if results: + data = results[0].text + if _is_valid_lpa_code(data): + self._detected = True + gui_app.pop_widget() + self._on_qr_detected(data) + + def _render(self, rect): + rl.begin_scissor_mode(int(rect.x), int(rect.y), int(rect.width), int(rect.height)) + + if self._camera_view: + self._camera_view._render(rect) + + if not self._camera_view or not self._camera_view.frame: + gui_label(rect, "camera starting", font_size=100, font_weight=FontWeight.BOLD, + alignment=rl.GuiTextAlignment.TEXT_ALIGN_CENTER) + else: + label_y = rect.y + rect.height * 3 / 4 + label_rect = rl.Rectangle(rect.x, label_y, rect.width, rect.y + rect.height - label_y) + gui_label(label_rect, "hold QR code to camera", font_size=64, font_weight=FontWeight.BOLD, + alignment=rl.GuiTextAlignment.TEXT_ALIGN_CENTER) + + rl.end_scissor_mode() + + def _handle_mouse_release(self, mouse_pos): + # Tap anywhere to dismiss + gui_app.pop_widget() + + +class InstallingDialog(Widget): + DOT_STEP = 0.6 + + def __init__(self): + super().__init__() + self._show_time = 0.0 + + def show_event(self): + super().show_event() + self._show_time = rl.get_time() + if device: + device.set_override_interactive_timeout(600) + + def hide_event(self): + super().hide_event() + if device: + device.set_override_interactive_timeout(None) + + def _render(self, rect): + margin = 200 + dialog_rect = rl.Rectangle(margin, margin, gui_app.width - 2 * margin, gui_app.height - 2 * margin) + rl.draw_rectangle_rec(dialog_rect, rl.Color(27, 27, 27, 255)) + + t = (rl.get_time() - self._show_time) % (self.DOT_STEP * 2) + dots = "." * min(int(t / (self.DOT_STEP / 4)), 3) + gui_label(dialog_rect, f"Installing eSIM profile{dots}", font_size=70, font_weight=FontWeight.BOLD, + alignment=rl.GuiTextAlignment.TEXT_ALIGN_CENTER, + color=rl.Color(201, 201, 201, 255)) + + +class ESimManagerUI(Widget): + def __init__(self, cellular_manager: CellularManager): + super().__init__() + self._cellular_manager = cellular_manager + self.state: UIState = UIState.IDLE + self._state_iccid: str | None = None + self.scroll_panel = GuiScrollPanel() + self.keyboard = Keyboard(max_text_size=MAX_NICKNAME_LENGTH, min_text_size=0) + + self._profiles: list[Profile] = [] + self._profile_buttons: dict[str, Button] = {} + self._forget_buttons: dict[str, Button] = {} + self._rename_buttons: dict[str, Button] = {} + self._active_button = Button("Active", lambda: None, font_size=45, button_style=ButtonStyle.NORMAL) + self._active_button.set_enabled(False) + self._add_button = Button("Add eSIM", self._on_add_profile, font_size=55, button_style=ButtonStyle.PRIMARY) + + self._installing_dialog: InstallingDialog | None = None + self._pending_lpa_code: str | None = None + + self._cellular_manager.add_callbacks( + profiles_updated=self._on_profiles_updated, + operation_error=self._on_error, + ) + + def show_event(self): + super().show_event() + self._on_profiles_updated(self._cellular_manager.profiles) + self._cellular_manager.refresh_profiles() + gui_app.add_nav_stack_tick(self._cellular_manager.process_callbacks) + + def hide_event(self): + super().hide_event() + gui_app.remove_nav_stack_tick(self._cellular_manager.process_callbacks) + + def _update_state(self): + self._cellular_manager.process_callbacks() + + def _on_profiles_updated(self, profiles: list[Profile]): + if self._installing_dialog: + gui_app.pop_widget() + self._installing_dialog = None + + self._profiles = profiles + self._profile_buttons.clear() + self._forget_buttons.clear() + self._rename_buttons.clear() + + for p in self._profiles: + is_comma = self._cellular_manager.is_comma_profile(p.iccid) + display = "comma prime" if is_comma else _profile_display_name(p) + self._profile_buttons[p.iccid] = Button(display, partial(self._on_profile_clicked, p.iccid), font_size=55, + text_alignment=rl.GuiTextAlignment.TEXT_ALIGN_LEFT, + button_style=ButtonStyle.TRANSPARENT_WHITE_TEXT) + self._profile_buttons[p.iccid].set_touch_valid_callback(lambda: self.scroll_panel.is_touch_valid()) + + if not is_comma: + self._rename_buttons[p.iccid] = Button("Rename", partial(self._on_rename_clicked, p.iccid), + button_style=ButtonStyle.LIST_ACTION, font_size=45) + self._rename_buttons[p.iccid].set_touch_valid_callback(lambda: self.scroll_panel.is_touch_valid()) + + if not p.enabled and not is_comma: + self._forget_buttons[p.iccid] = Button("Forget", partial(self._on_forget_clicked, p.iccid), + button_style=ButtonStyle.FORGET_WIFI, font_size=45) + self._forget_buttons[p.iccid].set_touch_valid_callback(lambda: self.scroll_panel.is_touch_valid()) + + if self.state == UIState.SWITCHING or self.state == UIState.DELETING: + self.state = UIState.IDLE + self._state_iccid = None + + def _on_error(self, error: str): + if self._installing_dialog: + gui_app.pop_widget() + self._installing_dialog = None + + self.state = UIState.IDLE + self._state_iccid = None + gui_app.push_widget(alert_dialog(error)) + + def _render(self, rect: rl.Rectangle): + if not self._profiles: + gui_label(rect, "Loading eSIM profiles...", 72, alignment=rl.GuiTextAlignment.TEXT_ALIGN_CENTER) + return + + # Total items: profiles + add button + total_items = len(self._profiles) + 1 + content_rect = rl.Rectangle(rect.x, rect.y, rect.width, total_items * ITEM_HEIGHT) + offset = self.scroll_panel.update(rect, content_rect) + + rl.begin_scissor_mode(int(rect.x), int(rect.y), int(rect.width), int(rect.height)) + for i, profile in enumerate(self._profiles): + y_offset = rect.y + i * ITEM_HEIGHT + offset + item_rect = rl.Rectangle(rect.x, y_offset, rect.width, ITEM_HEIGHT) + if not rl.check_collision_recs(item_rect, rect): + continue + + self._draw_profile_item(item_rect, profile) + line_y = int(item_rect.y + item_rect.height - 1) + rl.draw_line(int(item_rect.x), line_y, int(item_rect.x + item_rect.width), line_y, rl.LIGHTGRAY) + + # Add button at the bottom + add_y = rect.y + len(self._profiles) * ITEM_HEIGHT + offset + add_rect = rl.Rectangle(rect.x + rect.width / 2 - 200, add_y + (ITEM_HEIGHT - 80) / 2, 400, 80) + if rl.check_collision_recs(rl.Rectangle(rect.x, add_y, rect.width, ITEM_HEIGHT), rect): + self._add_button.set_enabled(not self._cellular_manager.busy) + self._add_button.render(add_rect) + + rl.end_scissor_mode() + + def _draw_profile_item(self, rect: rl.Rectangle, profile: Profile): + btn_width = 200 + rename_btn_width = 240 + spacing = 50 + is_comma = self._cellular_manager.is_comma_profile(profile.iccid) + + # Draw comma icon for comma profiles + icon_offset = 0 + if is_comma: + icon = gui_app.texture("icons_mici/settings/comma_icon.png", COMMA_ICON_SIZE, COMMA_ICON_SIZE) + icon_x = rect.x + 20 + icon_y = rect.y + (ITEM_HEIGHT - COMMA_ICON_SIZE) / 2 + rl.draw_texture_v(icon, rl.Vector2(icon_x, icon_y), rl.WHITE) + icon_offset = COMMA_ICON_SIZE + 30 + + ssid_rect = rl.Rectangle(rect.x + icon_offset, rect.y, rect.width - btn_width * 2 - icon_offset, ITEM_HEIGHT) + + status_text = "" + if self.state == UIState.SWITCHING and self._state_iccid == profile.iccid: + self._profile_buttons[profile.iccid].set_enabled(False) + status_text = "SWITCHING..." + elif self.state == UIState.DELETING and self._state_iccid == profile.iccid: + self._profile_buttons[profile.iccid].set_enabled(False) + status_text = "DELETING..." + elif profile.enabled: + pass + else: + self._profile_buttons[profile.iccid].set_enabled(True) + + self._profile_buttons[profile.iccid].render(ssid_rect) + + if status_text: + status_rect = rl.Rectangle(rect.x + rect.width - 410 - spacing, rect.y, 410, ITEM_HEIGHT) + gui_label(status_rect, status_text, font_size=48, alignment=rl.GuiTextAlignment.TEXT_ALIGN_CENTER) + elif profile.enabled: + btn_x = rect.x + rect.width - btn_width - spacing + active_rect = rl.Rectangle(btn_x, rect.y + (ITEM_HEIGHT - 80) / 2, btn_width, 80) + self._active_button.render(active_rect) + if profile.iccid in self._rename_buttons: + rename_rect = rl.Rectangle(btn_x - rename_btn_width - 10, rect.y + (ITEM_HEIGHT - 80) / 2, rename_btn_width, 80) + self._rename_buttons[profile.iccid].render(rename_rect) + elif profile.iccid in self._forget_buttons: + btn_x = rect.x + rect.width - btn_width - spacing + forget_rect = rl.Rectangle(btn_x, rect.y + (ITEM_HEIGHT - 80) / 2, btn_width, 80) + self._forget_buttons[profile.iccid].render(forget_rect) + if profile.iccid in self._rename_buttons: + rename_rect = rl.Rectangle(btn_x - rename_btn_width - 10, rect.y + (ITEM_HEIGHT - 80) / 2, rename_btn_width, 80) + self._rename_buttons[profile.iccid].render(rename_rect) + + def _on_profile_clicked(self, iccid: str): + profile = next((p for p in self._profiles if p.iccid == iccid), None) + if profile is None or profile.enabled: + return + + self.state = UIState.SWITCHING + self._state_iccid = iccid + self._cellular_manager.switch_profile(iccid) + + def _on_rename_clicked(self, iccid: str): + profile = next((p for p in self._profiles if p.iccid == iccid), None) + if profile is None: + return + + current_name = profile.nickname or "" + self.keyboard.reset(min_text_size=0) + self.keyboard.set_title("Enter nickname", f"for \"{_profile_display_name(profile)}\"") + self.keyboard.set_text(current_name) + self.keyboard.set_callback(lambda result: self._on_nickname_entered(iccid, result)) + gui_app.push_widget(self.keyboard) + + def _on_nickname_entered(self, iccid: str, result: DialogResult): + if result == DialogResult.CONFIRM: + nickname = self.keyboard.text.strip() + self._cellular_manager.nickname_profile(iccid, nickname) + + def _on_forget_clicked(self, iccid: str): + profile = next((p for p in self._profiles if p.iccid == iccid), None) + if profile is None: + return + + name = _profile_display_name(profile) + confirm = ConfirmDialog(f"Delete eSIM profile \"{name}\"?", "Delete", "Cancel", + callback=lambda result: self._on_forget_confirmed(iccid, result)) + gui_app.push_widget(confirm) + + def _on_forget_confirmed(self, iccid: str, result: DialogResult): + if result == DialogResult.CONFIRM: + self.state = UIState.DELETING + self._state_iccid = iccid + self._cellular_manager.delete_profile(iccid) + + def _on_add_profile(self): + if not CameraView or not zxingcpp: + gui_app.push_widget(alert_dialog("QR scanning not available on this platform")) + return + + scanner = QRScannerDialog(on_qr_detected=self._on_qr_scanned) + gui_app.push_widget(scanner) + + def _on_qr_scanned(self, lpa_code: str): + self._pending_lpa_code = lpa_code + self.keyboard.reset(min_text_size=0) + self.keyboard.set_title("Enter a nickname for this profile", lpa_code) + self.keyboard.set_text("") + self.keyboard.set_callback(self._on_nickname_for_new_profile) + gui_app.push_widget(self.keyboard) + + def _on_nickname_for_new_profile(self, result: DialogResult): + if result != DialogResult.CONFIRM: + return + + self._pending_nickname = self.keyboard.text.strip() or None + self._installing_dialog = InstallingDialog() + gui_app.push_widget(self._installing_dialog) + + def check_connectivity(): + try: + req = urllib.request.Request("https://openpilot.comma.ai", method="HEAD") + urllib.request.urlopen(req, timeout=2.0) + connected = True + except Exception: + connected = False + self._cellular_manager._callback_queue.append( + lambda: self._cellular_manager.download_profile(self._pending_lpa_code, self._pending_nickname) if connected + else self._on_error("No internet connection.\nConnect to Wi-Fi or cellular to install.") + ) + + threading.Thread(target=check_connectivity, daemon=True).start() diff --git a/system/ui/widgets/network.py b/system/ui/widgets/network.py index e739eef63d7854..d4b6787cf060ea 100644 --- a/system/ui/widgets/network.py +++ b/system/ui/widgets/network.py @@ -5,11 +5,13 @@ import pyray as rl from openpilot.system.ui.lib.application import gui_app from openpilot.system.ui.lib.multilang import tr +from openpilot.system.ui.lib.cellular_manager import CellularManager from openpilot.system.ui.lib.scroll_panel import GuiScrollPanel from openpilot.system.ui.lib.wifi_manager import WifiManager, SecurityType, Network, MeteredType, normalize_ssid from openpilot.system.ui.widgets import DialogResult, Widget from openpilot.system.ui.widgets.button import ButtonStyle, Button from openpilot.system.ui.widgets.confirm_dialog import ConfirmDialog +from openpilot.system.ui.widgets.esim import ESimManagerUI from openpilot.system.ui.widgets.keyboard import Keyboard from openpilot.system.ui.widgets.label import gui_label from openpilot.system.ui.widgets.scroller_tici import Scroller @@ -41,7 +43,8 @@ class PanelType(IntEnum): WIFI = 0 - ADVANCED = 1 + ESIM = 1 + ADVANCED = 2 class UIState(IntEnum): @@ -65,48 +68,81 @@ def _render(self, _): class NetworkUI(Widget): + NAV_BTN_SPACING = 20 + def __init__(self, wifi_manager: WifiManager): super().__init__() self._wifi_manager = wifi_manager self._current_panel: PanelType = PanelType.WIFI + self._cellular_manager = CellularManager() self._wifi_panel = self._child(WifiManagerUI(wifi_manager)) - self._advanced_panel = self._child(AdvancedNetworkSettings(wifi_manager)) - self._nav_button = self._child(NavButton(tr("Advanced"))) - self._nav_button.set_click_callback(self._cycle_panel) + self._esim_panel = self._child(ESimManagerUI(self._cellular_manager)) + self._advanced_panel = self._child(AdvancedNetworkSettings(wifi_manager, self._cellular_manager)) + + self._toggle_button = self._child(NavButton(tr("eSIM"))) + self._toggle_button.set_click_callback(self._toggle_wifi_esim) + + self._advanced_button = self._child(NavButton(tr("Advanced"))) + self._advanced_button.set_click_callback(self._toggle_advanced) def show_event(self): super().show_event() self._set_current_panel(PanelType.WIFI) + self._cellular_manager.refresh_profiles() - def _cycle_panel(self): - if self._current_panel == PanelType.WIFI: - self._set_current_panel(PanelType.ADVANCED) + def _toggle_wifi_esim(self): + if self._current_panel == PanelType.ESIM: + self._set_current_panel(PanelType.WIFI) else: + self._set_current_panel(PanelType.ESIM) + + def _toggle_advanced(self): + if self._current_panel == PanelType.ADVANCED: self._set_current_panel(PanelType.WIFI) + else: + self._set_current_panel(PanelType.ADVANCED) def _render(self, _): - # subtract button - content_rect = rl.Rectangle(self._rect.x, self._rect.y + self._nav_button.rect.height + 40, - self._rect.width, self._rect.height - self._nav_button.rect.height - 40) + btn_h = self._toggle_button.rect.height + content_rect = rl.Rectangle(self._rect.x, self._rect.y + btn_h + 40, + self._rect.width, self._rect.height - btn_h - 40) + + # Toggle button: shows the other option (Wi-Fi / eSIM) + if self._current_panel == PanelType.ESIM: + self._toggle_button.text = tr("Wi-Fi") + else: + self._toggle_button.text = tr("eSIM") + + # Advanced button + if self._current_panel == PanelType.ADVANCED: + self._advanced_button.text = tr("Back") + else: + self._advanced_button.text = tr("Advanced") + + # Position: toggle on left of advanced, advanced on right + self._advanced_button.set_position(self._rect.x + self._rect.width - self._advanced_button.rect.width, self._rect.y + 20) + self._toggle_button.set_position(self._advanced_button.rect.x - self._toggle_button.rect.width - self.NAV_BTN_SPACING, self._rect.y + 20) + + # Render panel if self._current_panel == PanelType.WIFI: - self._nav_button.text = tr("Advanced") - self._nav_button.set_position(self._rect.x + self._rect.width - self._nav_button.rect.width, self._rect.y + 20) self._wifi_panel.render(content_rect) + elif self._current_panel == PanelType.ESIM: + self._esim_panel.render(content_rect) else: - self._nav_button.text = tr("Back") - self._nav_button.set_position(self._rect.x, self._rect.y + 20) self._advanced_panel.render(content_rect) - self._nav_button.render() + self._toggle_button.render() + self._advanced_button.render() def _set_current_panel(self, panel: PanelType): self._current_panel = panel class AdvancedNetworkSettings(Widget): - def __init__(self, wifi_manager: WifiManager): + def __init__(self, wifi_manager: WifiManager, cellular_manager: CellularManager | None = None): super().__init__() self._wifi_manager = wifi_manager + self._cellular_manager = cellular_manager self._wifi_manager.add_callbacks(networks_updated=self._on_network_updated) self._params = Params() @@ -144,7 +180,8 @@ def __init__(self, wifi_manager: WifiManager): items: list[Widget] = [ tethering_btn, tethering_password_btn, - text_item(lambda: tr("IP Address"), lambda: self._wifi_manager.ipv4_address), + text_item(lambda: tr("WiFi IP Address"), lambda: self._wifi_manager.ipv4_address), + text_item(lambda: tr("Modem IP Address"), lambda: self._cellular_manager.modem_ip if self._cellular_manager else ""), self._roaming_btn, self._apn_btn, self._cellular_metered_btn, diff --git a/uv.lock b/uv.lock index 272421934ce94b..5de7e4cc8024de 100644 --- a/uv.lock +++ b/uv.lock @@ -840,6 +840,7 @@ dependencies = [ { name = "zeromq" }, { name = "zstandard" }, { name = "zstd" }, + { name = "zxing-cpp" }, ] [package.optional-dependencies] @@ -935,6 +936,7 @@ requires-dist = [ { name = "zeromq", git = "https://github.com/commaai/dependencies.git?subdirectory=zeromq&rev=release-zeromq" }, { name = "zstandard" }, { name = "zstd", git = "https://github.com/commaai/dependencies.git?subdirectory=zstd&rev=release-zstd" }, + { name = "zxing-cpp", specifier = "==3.0.0" }, ] provides-extras = ["docs", "testing", "dev", "tools"] @@ -1703,3 +1705,17 @@ wheels = [ name = "zstd" version = "1.5.6" source = { git = "https://github.com/commaai/dependencies.git?subdirectory=zstd&rev=release-zstd#b2b10636beba0384eada30979651b4ca7cf919ff" } + +[[package]] +name = "zxing-cpp" +version = "3.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/f1/c6/ac2a12cdc2b1c296804fc6a65bf112b607825ca7f47742a5aca541134711/zxing_cpp-3.0.0.tar.gz", hash = "sha256:703353304de24d947bd68044fac4e062953a7b64029de6941ba8ffeb4476b60d", size = 1197544, upload-time = "2026-02-10T12:50:11.252Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7b/7f/32b4cc8545da72061d360aca9d96c51738d48e2f3a8eebe06a47f4103dd6/zxing_cpp-3.0.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:8b76fac77c94545c5a6e2e6184a121c09409fff29f9c7557e350c16b78025d74", size = 914798, upload-time = "2026-02-10T12:49:43.556Z" }, + { url = "https://files.pythonhosted.org/packages/df/21/5ba18d19383fe5f044fefa79640f4234665bc77057cf3d584e5eb979685f/zxing_cpp-3.0.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:0bf58043c543d3440f1cbef6bfa9e5ad7139c39c90955d1f294f4778f0cd1ec0", size = 867437, upload-time = "2026-02-10T12:49:45.424Z" }, + { url = "https://files.pythonhosted.org/packages/8a/2a/94d98c5b728e1dfeec3a343f2581bf7f372ca448cefff50076cab0c6e0c4/zxing_cpp-3.0.0-cp312-cp312-manylinux_2_26_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:548cc0e767f24193038031c76f60f2de0965ab5b05106dff6095bcae89607748", size = 995650, upload-time = "2026-02-10T12:49:47.222Z" }, + { url = "https://files.pythonhosted.org/packages/39/0f/03f09d048b7dde279a5bed8839ffbb21f7e8995747afa17970791c0356ff/zxing_cpp-3.0.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:cfdf7a393541f4cd7c7c9329ec5d56b49a5cfc91bf24cdc53ec301d41c2afd68", size = 1074289, upload-time = "2026-02-10T12:49:48.804Z" }, + { url = "https://files.pythonhosted.org/packages/1b/74/b8ad4faba8798991ef710107758fc7df36fb0f6b9ff6092f89e3039d1381/zxing_cpp-3.0.0-cp312-cp312-win32.whl", hash = "sha256:5193b305f317eed0e55963e33ccac33c6ffa12a225bc68c6363ae62ae49a10a3", size = 828198, upload-time = "2026-02-10T12:49:50.615Z" }, + { url = "https://files.pythonhosted.org/packages/99/35/4ad2b3731fd03b245dd3987bd8b16e1b8d9ae0f60d178eecc73b1c6fa19c/zxing_cpp-3.0.0-cp312-cp312-win_amd64.whl", hash = "sha256:e92d64ab213bb9be0734ad10a5409ae258eb413e0569d8a6d61d483558ab16e6", size = 909267, upload-time = "2026-02-10T12:49:52.022Z" }, +]