diff --git a/esp_flasher/gui.py b/esp_flasher/gui.py index b2626ef..b0a77a9 100644 --- a/esp_flasher/gui.py +++ b/esp_flasher/gui.py @@ -2,20 +2,430 @@ import sys import threading import os +import logging import platform +import serial +import html from PyQt6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QLabel, QComboBox, QFileDialog, QTextEdit, QGroupBox, QGridLayout, - QLineEdit) + QLineEdit, QDialog, QListWidget, QListWidgetItem, + QProgressBar, QMessageBox) from PyQt6.QtGui import QColor, QPalette -from PyQt6.QtCore import pyqtSignal, QObject, Qt, QSettings +from PyQt6.QtCore import pyqtSignal, QObject, Qt, QSettings, QTimer from esp_flasher.own_esptool import get_port_list, colorize, COLOR_RED, COLOR_GREEN, COLOR_CYAN, COLOR_YELLOW from esp_flasher.const import (__version__, DEFAULT_WINDOW_WIDTH, DEFAULT_WINDOW_HEIGHT, DEFAULT_WINDOW_X, DEFAULT_WINDOW_Y) from esp_flasher.console_color import ColoredConsole +from esp_flasher.serial_console import SerialReader + +logger = logging.getLogger(__name__) + +class DeviceInfoDialog(QDialog): + """Attractive dialog to display device information.""" + + def __init__(self, device_info, parent=None): + super().__init__(parent) + self.setWindowTitle("Device Information") + self.setMinimumWidth(400) + self.setMinimumHeight(250) + self._device_info = device_info + self._init_ui() + + def _init_ui(self): + layout = QVBoxLayout() + layout.setSpacing(15) + layout.setContentsMargins(20, 20, 20, 20) + + # Title + title = QLabel("Device Information") + title.setStyleSheet(""" + QLabel { + font-size: 18px; + font-weight: bold; + color: white; + padding: 10px; + } + """) + layout.addWidget(title) + + # Info card container + card = QWidget() + card.setStyleSheet(""" + QWidget { + border-radius: 8px; + padding: 15px; + } + """) + card_layout = QVBoxLayout() + card_layout.setSpacing(12) + + labels = ["Firmware", "Version", "Chip", "Name"] + + for i, val in enumerate(self._device_info): + if val and i < len(labels): + info_row = QLabel(f"{labels[i]}: {html.escape(val)}") + info_row.setStyleSheet(""" + QLabel { + font-size: 13px; + padding: 8px; + border-radius: 4px; + border-left: 3px; + } + """) + info_row.setWordWrap(True) + info_row.setTextFormat(Qt.TextFormat.RichText) + card_layout.addWidget(info_row) + + card.setLayout(card_layout) + layout.addWidget(card) + + layout.addStretch() + + # Close button + close_btn = QPushButton("Close") + close_btn.setStyleSheet(""" + QPushButton { + padding: 10px 20px; + font-size: 13px; + border-radius: 4px; + font-weight: bold; + border: 1px solid #cccccc; + } + QPushButton:hover { + background-color: #2d5016; + color: white; + border: 1px solid #2d5016; + } + QPushButton:pressed { + background-color: #1f3a0f; + color: white; + border: 1px solid #1f3a0f; + } + """) + close_btn.clicked.connect(self.accept) + layout.addWidget(close_btn) + + self.setLayout(layout) + + +class ImprovDialog(QDialog): + """Dialog for Improv WiFi provisioning.""" + _scan_finished = pyqtSignal(list) # thread-safe signal for scan results + _provision_failed_signal = pyqtSignal() # thread-safe signal for provision failure + + def __init__(self, serial_port, parent=None): + super().__init__(parent) + self.setWindowTitle("Improv WiFi Provisioning") + self.setMinimumWidth(420) + self._serial_port = serial_port # reuse already-open port (never close it) + self._improv = None + self._is_provisioning = False # True only after we send credentials + self._scan_finished.connect(self._update_network_list) + self._provision_failed_signal.connect(self._provision_failed) + self._init_ui() + self._start_improv() + + def _init_ui(self): + layout = QVBoxLayout() + + # Device info + info_container = QHBoxLayout() + self.info_label = QLabel("Detecting Improv device...") + self.info_label.setWordWrap(True) + info_container.addWidget(self.info_label) + self.info_btn = QPushButton("Details") + self.info_btn.setVisible(False) + self.info_btn.setMaximumWidth(100) + self.info_btn.setStyleSheet(""" + QPushButton { + padding: 5px 10px; + border-radius: 4px; + font-weight: bold; + border: 1px solid #cccccc; + } + QPushButton:hover { + background-color: #2d5016; + color: white; + border: 1px solid #2d5016; + } + QPushButton:pressed { + background-color: #1f3a0f; + color: white; + border: 1px solid #1f3a0f; + } + """) + self.info_btn.clicked.connect(self._show_device_info_dialog) + info_container.addWidget(self.info_btn) + layout.addLayout(info_container) + + # WiFi network list + net_group = QGroupBox("WiFi Networks") + net_layout = QVBoxLayout() + self.network_list = QListWidget() + self.network_list.itemDoubleClicked.connect(self._on_network_selected) + net_layout.addWidget(self.network_list) + + self.scan_btn = QPushButton("Scan Networks") + self.scan_btn.clicked.connect(self._scan_networks) + net_layout.addWidget(self.scan_btn) + net_group.setLayout(net_layout) + layout.addWidget(net_group) + + # Credentials + cred_group = QGroupBox("WiFi Credentials") + cred_layout = QGridLayout() + cred_layout.addWidget(QLabel("SSID:"), 0, 0) + self.ssid_input = QLineEdit() + cred_layout.addWidget(self.ssid_input, 0, 1) + cred_layout.addWidget(QLabel("Password:"), 1, 0) + self.password_input = QLineEdit() + self.password_input.setEchoMode(QLineEdit.EchoMode.Password) + cred_layout.addWidget(self.password_input, 1, 1) + cred_group.setLayout(cred_layout) + layout.addWidget(cred_group) + + # Status / progress + self.status_label = QLabel("") + layout.addWidget(self.status_label) + self.progress = QProgressBar() + self.progress.setRange(0, 0) # indeterminate + self.progress.setVisible(False) + layout.addWidget(self.progress) + + # Buttons + btn_layout = QHBoxLayout() + self.provision_btn = QPushButton("Provision") + self.provision_btn.clicked.connect(self._provision) + self.provision_btn.setEnabled(False) + btn_layout.addWidget(self.provision_btn) + + close_btn = QPushButton("Close") + close_btn.clicked.connect(self.close) + btn_layout.addWidget(close_btn) + layout.addLayout(btn_layout) + + self.setLayout(layout) + + def _start_improv(self): + """Start Improv on the already-open serial port.""" + if not self._serial_port or not self._serial_port.is_open: + self.status_label.setText("Serial port not open") + return + + # Drain & discard stale console data so the Improv state-machine + # starts clean. The read-loop also handles stale bytes via its + # newline-reset logic, so this is belt-and-suspenders. + try: + self._serial_port.reset_input_buffer() + except (OSError, serial.SerialException) as e: + self.status_label.setText(f"Port error: {e}") + return + + from esp_flasher.improv import ImprovManager + self._improv = ImprovManager(self._serial_port) + self._improv.state_changed.connect(self._on_state_changed) + self._improv.error_received.connect(self._on_error) + self._improv.device_info_received.connect(self._on_device_info) + self._improv.log_message.connect(self._on_log) + self._improv.provisioned.connect(self._on_provisioned) + self._improv.start() + + # Periodically poll for device until it responds + self._detect_attempts = 0 + self._detect_timer = QTimer(self) + self._detect_timer.timeout.connect(self._poll_device_state) + self._detect_timer.start(1000) + # Also send first request immediately + QTimer.singleShot(100, self._poll_device_state) + + def _poll_device_state(self): + """Send request_current_state until device responds or we give up.""" + if not self._improv: + self._detect_timer.stop() + return + if self._improv.device_state is not None: + # Device has responded, stop polling + self._detect_timer.stop() + return + self._detect_attempts += 1 + if self._detect_attempts > 15: + self._detect_timer.stop() + self.status_label.setText("No Improv device detected (timeout)") + return + self.status_label.setText( + f"Detecting Improv device... (attempt {self._detect_attempts}/15)" + ) + self._improv.request_current_state() + + def _on_state_changed(self, state): + from esp_flasher.improv import STATE_READY, STATE_PROVISIONING, STATE_PROVISIONED, STATE_NAMES + name = STATE_NAMES.get(state, f"Unknown ({state})") + self.progress.setVisible(False) + if state == STATE_READY: + self.status_label.setText(f"State: {name}") + self.provision_btn.setEnabled(True) + # Auto-request device info + improv = self._improv + threading.Thread(target=lambda: self._request_info_bg(improv), daemon=True).start() + elif state == STATE_PROVISIONING: + self.status_label.setText("Connecting to WiFi...") + self.provision_btn.setEnabled(False) + self.progress.setVisible(True) + elif state == STATE_PROVISIONED: + if self._is_provisioning: + self.status_label.setText("✓ WiFi provisioned successfully!") + self.provision_btn.setEnabled(False) + else: + self.status_label.setText("Device already connected to WiFi") + self.provision_btn.setEnabled(True) + # Request device info in both cases + improv = self._improv + threading.Thread(target=lambda: self._request_info_bg(improv), daemon=True).start() + + def _on_error(self, error): + from esp_flasher.improv import ERROR_NAMES, ERROR_NONE + if error != ERROR_NONE: + self.status_label.setText(f"Error: {ERROR_NAMES.get(error, 'Unknown')}") + self.progress.setVisible(False) + self.provision_btn.setEnabled(True) + + def _on_device_info(self, info): + self._device_info = info # Store for later display + + if not info or not any(info): + self.info_label.setText("Device detected") + self.info_btn.setVisible(False) + return + + # Show compact summary + name = info[3] if len(info) > 3 and info[3] else "Unknown" + chip = info[2] if len(info) > 2 and info[2] else "Unknown" + self.info_label.setText(f"{name} ({chip})") + self.info_label.setStyleSheet(""" + QLabel { + font-size: 15px; + font-weight: bold; + color: white; + padding: 0px; + } + """) + self.info_btn.setVisible(True) + + def _show_device_info_dialog(self): + """Open the detailed device info dialog.""" + if hasattr(self, '_device_info') and self._device_info: + dialog = DeviceInfoDialog(self._device_info, self) + dialog.exec() + + def _on_log(self, msg): + """Show Improv status messages in the dialog's status label.""" + self.status_label.setText(msg) + + def _on_provisioned(self, result): + self.progress.setVisible(False) + self.provision_btn.setEnabled(False) + msg = "✓ WiFi provisioned successfully!" + if result: + msg += f"\n{', '.join(result)}" + self.status_label.setText(msg) + + def _on_network_selected(self, item): + ssid = item.data(Qt.ItemDataRole.UserRole) + if ssid: + self.ssid_input.setText(ssid) + self.password_input.setFocus() + + def _scan_networks(self): + if getattr(self, '_scan_in_progress', False): + return + if not self._improv: + self.status_label.setText("Improv not initialized") + return + self._scan_in_progress = True + self.scan_btn.setEnabled(False) + self.network_list.clear() + self.status_label.setText("Scanning WiFi networks...") + self.progress.setVisible(True) + improv = self._improv + threading.Thread(target=lambda: self._scan_bg(improv), daemon=True).start() + + def _scan_bg(self, improv): + try: + networks = improv.request_wifi_networks() + # Sort by RSSI descending + networks.sort(key=lambda n: n[1], reverse=True) + # Thread-safe: emit signal to update UI on main thread + self._scan_finished.emit(networks) + except Exception as e: + logger.error("WiFi scan error: %s", e) + # Emit empty list to reset UI + self._scan_finished.emit([]) + + def _update_network_list(self, networks): + self._scan_in_progress = False + self.scan_btn.setEnabled(True) + self.network_list.clear() + self.progress.setVisible(False) + if not networks: + self.status_label.setText("No networks found") + return + for ssid, rssi, secured in networks: + lock = "🔒 " if secured else " " + item = QListWidgetItem(f"{lock}{ssid} ({rssi} dBm)") + item.setData(Qt.ItemDataRole.UserRole, ssid) + self.network_list.addItem(item) + self.status_label.setText(f"Found {len(networks)} networks") + + def _request_info_bg(self, improv): + try: + improv.request_device_info() + except Exception as e: + logger.error("Device info request error: %s", e) + + def _provision(self): + ssid = self.ssid_input.text().strip() + password = self.password_input.text() + if not ssid: + self.status_label.setText("Please enter an SSID") + return + if not self._improv: + self.status_label.setText("Improv not initialized") + return + self._is_provisioning = True + self.provision_btn.setEnabled(False) + self.progress.setVisible(True) + self.status_label.setText(f"Provisioning WiFi: {ssid}...") + improv = self._improv + threading.Thread(target=lambda: self._provision_bg(improv, ssid, password), daemon=True).start() + + def _provision_bg(self, improv, ssid, password): + try: + result = improv.send_wifi_settings(ssid, password) + if result is None: + self._provision_failed_signal.emit() + except Exception as e: + logger.error("Provisioning error: %s", e) + self._provision_failed_signal.emit() + + def _provision_failed(self): + self._is_provisioning = False + self.progress.setVisible(False) + self.provision_btn.setEnabled(True) + self.status_label.setText("WiFi provisioning failed") + + def closeEvent(self, event): + if hasattr(self, '_detect_timer') and self._detect_timer.isActive(): + self._detect_timer.stop() + if self._improv: + self._improv.stop() + self._improv = None + # Do NOT close the serial port — caller owns it and will restart console reader + super().closeEvent(event) + class FlashingThread(threading.Thread): def __init__(self, firmware, port, finished=None, failed=None): @@ -118,6 +528,9 @@ def init_ui(self): self.flash_button = QPushButton("Flash ESP") self.flash_button.clicked.connect(self.flash_esp) actions_layout.addWidget(self.flash_button) + self.improv_button = QPushButton("Improv WiFi") + self.improv_button.clicked.connect(self.open_improv) + actions_layout.addWidget(self.improv_button) actions_group_box.setLayout(actions_layout) # Console with input field @@ -237,9 +650,6 @@ def connect_to_port(self): # Start serial communication try: - import serial - from esp_flasher.serial_console import SerialReader - self._serial_port = serial.Serial(self._port, baudrate=115200, timeout=1) # Start reader thread @@ -321,6 +731,45 @@ def _start_flash_worker(self): ) self._flash_worker.start() + def open_improv(self): + """Open Improv WiFi provisioning dialog. + stop console reader, pass open port to Improv, restart reader on close.""" + if self._is_flashing: + self.show_log_error("Cannot use Improv while flashing") + return + if not self._serial_port or not self._serial_port.is_open: + self.show_log_error("Connect to a serial port first") + return + + # Stop console reader — mute signals first to prevent cross-thread + # queued events from being delivered, then stop thread, then disconnect + # and flush the Qt event queue so no stale events remain. + if self._serial_reader: + self._serial_reader.stop() # sets _muted=True and running=False, joins thread + self._serial_reader.line_received.disconnect(self.append_log_line) + self._serial_reader.error_occurred.disconnect(self.handle_serial_error) + # Flush any already-queued cross-thread events so they are discarded + from PyQt6.QtWidgets import QApplication + QApplication.processEvents() + self._serial_reader = None + + # Disable console input while in Improv mode + self.input_field.setEnabled(False) + self.send_button.setEnabled(False) + + # Open Improv dialog with the same open port (port stays open, no ESP reset) + dlg = ImprovDialog(self._serial_port, parent=self) + dlg.exec() + + # Restart console reader on the same open port (like JS reconnectConsole) + if self._serial_port and self._serial_port.is_open: + self._serial_reader = SerialReader(self._serial_port) + self._serial_reader.line_received.connect(self.append_log_line) + self._serial_reader.error_occurred.connect(self.handle_serial_error) + self._serial_reader.start() + self.input_field.setEnabled(True) + self.send_button.setEnabled(True) + def on_flash_finished(self): """Called when flashing is complete""" print(colorize("\nFlashing complete!", COLOR_GREEN)) diff --git a/esp_flasher/improv.py b/esp_flasher/improv.py new file mode 100644 index 0000000..31cbbb9 --- /dev/null +++ b/esp_flasher/improv.py @@ -0,0 +1,455 @@ +""" +Improv Wi-Fi Serial Protocol implementation for ESP-Flasher. +""" + +import time +import threading +import logging + +from PyQt6.QtCore import pyqtSignal, QObject + +logger = logging.getLogger(__name__) + +# Improv packet header +IMPROV_HEADER = b"IMPROV" +IMPROV_VERSION = 0x01 + +# Message types +TYPE_CURRENT_STATE = 0x01 +TYPE_ERROR_STATE = 0x02 +TYPE_RPC = 0x03 +TYPE_RPC_RESULT = 0x04 + +# Device states +STATE_READY = 0x02 +STATE_PROVISIONING = 0x03 +STATE_PROVISIONED = 0x04 + +STATE_NAMES = { + STATE_READY: "Ready", + STATE_PROVISIONING: "Provisioning", + STATE_PROVISIONED: "Provisioned", +} + +# Error codes +ERROR_NONE = 0x00 +ERROR_INVALID_RPC = 0x01 +ERROR_UNKNOWN_RPC = 0x02 +ERROR_UNABLE_TO_CONNECT = 0x03 +ERROR_TIMEOUT = 0xFE +ERROR_UNKNOWN = 0xFF + +ERROR_NAMES = { + ERROR_NONE: "No error", + ERROR_INVALID_RPC: "Invalid RPC packet", + ERROR_UNKNOWN_RPC: "Unknown RPC command", + ERROR_UNABLE_TO_CONNECT: "Unable to connect", + ERROR_TIMEOUT: "Timeout", + ERROR_UNKNOWN: "Unknown error", +} + +# RPC commands +CMD_SEND_WIFI_SETTINGS = 0x01 +CMD_REQUEST_CURRENT_STATE = 0x02 +CMD_REQUEST_INFO = 0x03 +CMD_REQUEST_WIFI_NETWORKS = 0x04 + +PROVISION_TIMEOUT = 30.0 + + +def _build_packet(msg_type, data): + """Build an Improv serial packet.""" + payload = bytearray(IMPROV_HEADER) + payload.append(IMPROV_VERSION) + payload.append(msg_type) + payload.append(len(data)) + payload.extend(data) + checksum = sum(payload) & 0xFF + payload.append(checksum) + payload.append(0x0A) + return bytes(payload) + + +def _build_rpc(command, payload=b""): + """Build an RPC packet with the given command and payload.""" + data = bytearray() + data.append(command) + data.append(len(payload)) + data.extend(payload) + return _build_packet(TYPE_RPC, data) + + +def _build_wifi_payload(ssid, password): + """Build the WiFi settings payload (TLV encoded SSID + password).""" + ssid_bytes = ssid.encode("utf-8") + pw_bytes = password.encode("utf-8") + payload = bytearray() + payload.append(len(ssid_bytes)) + payload.extend(ssid_bytes) + payload.append(len(pw_bytes)) + payload.extend(pw_bytes) + return bytes(payload) + + +def _parse_tlv_strings(data): + """Parse TLV-encoded strings from RPC result data.""" + if len(data) < 2: + return [] + total_length = data[1] + strings = [] + idx = 2 + while idx < 2 + total_length and idx < len(data): + str_len = data[idx] + idx += 1 + if idx + str_len > len(data): + break + s = bytes(data[idx:idx + str_len]).decode("utf-8", errors="replace") + strings.append(s) + idx += str_len + return strings + + +class ImprovManager(QObject): + """Manages Improv serial communication with an ESP device. + + Receiver state machine: + - is_improv=None (scanning: accumulate bytes, check at 9 bytes) + - is_improv=True (reading improv packet body + checksum) + - is_improv=False (skip non-improv line until newline) + """ + + # Signals + state_changed = pyqtSignal(int) # device state + error_received = pyqtSignal(int) # error code + device_info_received = pyqtSignal(list) # [firmware, version, chip, name] + wifi_networks_received = pyqtSignal(list) # list of (ssid, rssi, secured) + provisioned = pyqtSignal(list) # result strings (e.g. redirect URL) + log_message = pyqtSignal(str) # log/status messages + + def __init__(self, serial_port): + super().__init__() + self._port = serial_port + self._running = False + self._thread = None + self._receiver_dead = False # Flag to indicate receiver thread has exited + + # Receiver state machine + self._line = [] + self._is_improv = None # None=scanning, True=reading packet, False=skip line + self._improv_length = 0 + + # RPC response synchronization + self._rpc_lock = threading.Lock() # Serialize RPC calls + self._rpc_event = threading.Event() + self._rpc_result = None + self._rpc_error = None + self._rpc_command = None + + # WiFi network scan accumulator + self._wifi_networks = [] + self._wifi_scan_done = threading.Event() + + # Current device state + self.device_state = None + + def start(self): + """Start the Improv receiver thread.""" + self._running = True + self._receiver_dead = False + self._thread = threading.Thread(target=self._read_loop, daemon=True) + self._thread.start() + + def stop(self): + """Stop the Improv receiver thread.""" + self._running = False + self._receiver_dead = True + # Unblock any threads waiting on RPC responses or WiFi scan + self._rpc_event.set() + self._wifi_scan_done.set() + if self._thread: + self._thread.join(timeout=3.0) + self._thread = None + + # --- Public API --- + + def request_current_state(self): + """Request the current state from the device.""" + pkt = _build_rpc(CMD_REQUEST_CURRENT_STATE) + self._write(pkt) + + def request_device_info(self, timeout=10.0): + """Request device info. Returns [firmware, version, chip, name] or None. + Uses a 10s timeout to prevent indefinite blocking.""" + return self._send_rpc(CMD_REQUEST_INFO, timeout=timeout) + + def request_wifi_networks(self, timeout=30.0): + """Scan for WiFi networks. Returns list of (ssid, rssi, secured).""" + if not self._running or not self._port or not self._port.is_open: + return [] + if self._receiver_dead: + raise RuntimeError("Improv receiver thread has died") + + # Hold lock across entire send-and-wait sequence to prevent race conditions + self._rpc_lock.acquire() + try: + self._wifi_networks = [] + self._wifi_scan_done.clear() + self._rpc_command = CMD_REQUEST_WIFI_NETWORKS + pkt = _build_rpc(CMD_REQUEST_WIFI_NETWORKS) + self._write(pkt) + + # Wait for scan completion while holding lock + if not self._wifi_scan_done.wait(timeout=timeout): + if self._receiver_dead: + raise RuntimeError("Improv receiver thread died during WiFi scan") + self.log_message.emit("WiFi scan timeout") + self._rpc_command = None + return list(self._wifi_networks) + finally: + # Release lock after wait completes or times out + self._rpc_lock.release() + + def send_wifi_settings(self, ssid, password, timeout=PROVISION_TIMEOUT): + """Send WiFi credentials. Returns result strings or None on error. + 30s timeout — (ssid, password, 30000).""" + payload = _build_wifi_payload(ssid, password) + return self._send_rpc(CMD_SEND_WIFI_SETTINGS, payload, timeout=timeout) + + # --- Internal --- + + def _send_rpc(self, command, payload=b"", timeout=None): + """Send an RPC command and wait for the result. + timeout=None means wait indefinitely. + Serializes RPC calls to prevent concurrent RPCs from racing.""" + if not self._running or not self._port or not self._port.is_open: + return None + if self._receiver_dead: + raise RuntimeError("Improv receiver thread has died") + + # Hold lock across entire send-and-wait sequence to prevent race conditions + self._rpc_lock.acquire() + try: + # Check again after acquiring lock + if self._receiver_dead: + raise RuntimeError("Improv receiver thread has died") + + self._rpc_event.clear() + self._rpc_result = None + self._rpc_error = None + self._rpc_command = command + + pkt = _build_rpc(command, payload) + self._write(pkt) + + # Wait for response while holding the lock + if self._rpc_event.wait(timeout=timeout): + if self._receiver_dead: + raise RuntimeError("Improv receiver thread died during RPC") + if self._rpc_error is not None and self._rpc_error != ERROR_NONE: + self._rpc_command = None + return None + result = self._rpc_result + self._rpc_command = None + return result + if self._receiver_dead: + raise RuntimeError("Improv receiver thread died during RPC") + self.log_message.emit("RPC timeout") + self.error_received.emit(ERROR_TIMEOUT) + self._rpc_command = None + return None + finally: + # Release lock after wait completes or times out + self._rpc_lock.release() + + def _write(self, data): + """Write data to the serial port.""" + try: + if self._port and self._port.is_open: + self._port.write(data) + self._port.flush() + else: + # Port not available — unblock any waiters so they don't hang + self._rpc_event.set() + self._wifi_scan_done.set() + except Exception as e: + logger.error("Improv write error: %s", e) + self.log_message.emit(f"Write error: {e}") + # Unblock any threads waiting for a response that will never come + self._rpc_event.set() + self._wifi_scan_done.set() + + def _read_loop(self): + """Background thread: read bytes and detect Improv packets. + Uses blocking read(1) instead of in_waiting polling — avoids macOS issues + where in_waiting may return 0 after tcflush/reset_input_buffer.""" + # Use short timeout so stop() isn't blocked for too long + old_timeout = self._port.timeout + try: + self._port.timeout = 0.1 + except Exception as e: + logger.debug("Could not set serial timeout: %s", e) + try: + while self._running: + try: + if not self._port or not self._port.is_open: + self.log_message.emit("Serial port closed") + self._running = False + self._rpc_event.set() + self._wifi_scan_done.set() + break + # Blocking read — returns 1 byte or b'' on timeout + raw = self._port.read(1) + if raw: + # Got one byte; also grab anything else already buffered + try: + remaining = self._port.in_waiting + if remaining > 0: + raw += self._port.read(remaining) + except OSError: + pass + for b in raw: + self._process_byte(b) + except Exception as e: + if self._running: + logger.error("Improv read error: %s", e) + self.log_message.emit(f"Read error: {e}") + self._running = False + self._rpc_event.set() + self._wifi_scan_done.set() + break + finally: + # Mark receiver as dead and wake all waiters + self._receiver_dead = True + self._rpc_event.set() + self._wifi_scan_done.set() + try: + self._port.timeout = old_timeout + except Exception as e: + logger.debug("Could not restore serial timeout: %s", e) + + def _process_byte(self, byte): + """Process a single byte.""" + + # State: is_improv=False → skip non-improv line until newline + if self._is_improv is False: + if byte == 0x0A: + self._is_improv = None + return + + # State: is_improv=True → collecting improv packet body + if self._is_improv is True: + self._line.append(byte) + if len(self._line) == self._improv_length: + self._handle_packet(self._line) + self._is_improv = None + self._line = [] + return + + # State: is_improv=None → scanning for header + + # Newline resets accumulation + if byte == 0x0A: + self._line = [] + return + + self._line.append(byte) + + # Only check once we have exactly 9 bytes + if len(self._line) != 9: + return + + # Check if first 6 bytes match "IMPROV" + if bytes(self._line[:6]) == IMPROV_HEADER: + # Header matched — calculate total packet length + data_len = self._line[8] + self._improv_length = 9 + data_len + 1 # 9 header + data + checksum + self._is_improv = True + else: + # Not an Improv header — discard buffer and skip rest of line + self._line = [] + self._is_improv = False + + def _handle_packet(self, line): + """Handle a complete Improv packet (including header + checksum).""" + # Checksum: sum of all bytes except the last one (the checksum itself) + calculated = sum(line[:-1]) & 0xFF + received = line[-1] + if calculated != received: + logger.warning("Improv checksum mismatch: expected %02X, got %02X", + calculated, received) + return + + # Parse fields after the 6-byte "IMPROV" prefix + version = line[6] + if version != IMPROV_VERSION: + return + + msg_type = line[7] + data_len = line[8] + data = line[9:9 + data_len] + + if msg_type == TYPE_CURRENT_STATE: + if data: + self.device_state = data[0] + state_name = STATE_NAMES.get(data[0], f"Unknown ({data[0]:#x})") + self.log_message.emit(f"Device state: {state_name}") + self.state_changed.emit(data[0]) + + elif msg_type == TYPE_ERROR_STATE: + if data: + error = data[0] + error_name = ERROR_NAMES.get(error, f"Unknown ({error:#x})") + if error != ERROR_NONE: + self.log_message.emit(f"Device error: {error_name}") + self.error_received.emit(error) + # Only wake the waiter if there's a pending RPC command + if error != ERROR_NONE and self._rpc_command is not None: + self._rpc_error = error + self._rpc_event.set() + + elif msg_type == TYPE_RPC_RESULT: + if not data: + return + command = data[0] + strings = _parse_tlv_strings(data) + + # Verify result matches pending RPC command to prevent race conditions + if self._rpc_command is not None and command != self._rpc_command: + logger.warning( + "Ignoring RPC result for command %d (expected %d)", + command, self._rpc_command + ) + return + + if command == CMD_REQUEST_INFO: + self.device_info_received.emit(strings) + if self._rpc_command == command: + self._rpc_result = strings + self._rpc_event.set() + + elif command == CMD_REQUEST_WIFI_NETWORKS: + if not strings: + # Empty result = scan complete + self.log_message.emit(f"WiFi scan complete: {len(self._wifi_networks)} networks") + self._wifi_scan_done.set() + else: + if len(strings) >= 3: + try: + rssi = int(strings[1]) + except (ValueError, IndexError): + rssi = 0 + self._wifi_networks.append( + (strings[0], rssi, strings[2] == "YES") + ) + + elif command == CMD_SEND_WIFI_SETTINGS: + self.log_message.emit(f"Provisioned: {strings}") + self.provisioned.emit(strings) + if self._rpc_command == command: + self._rpc_result = strings + self._rpc_event.set() + + else: + if self._rpc_command == command: + self._rpc_result = strings + self._rpc_event.set() diff --git a/esp_flasher/serial_console.py b/esp_flasher/serial_console.py index 6d302a9..6f22aaf 100644 --- a/esp_flasher/serial_console.py +++ b/esp_flasher/serial_console.py @@ -28,6 +28,7 @@ def __init__(self, serial_port): super().__init__() self.serial_port = serial_port self.running = False + self._muted = False # When True, suppress all signal emissions self.thread = None # Use incremental decoder for proper UTF-8 handling import codecs @@ -41,9 +42,11 @@ def start(self): def stop(self): """Stop reading from serial port""" + self._muted = True # Suppress signals immediately, before thread winds down self.running = False if self.thread: self.thread.join(timeout=1.0) + self.thread = None def _read_loop(self): """Read loop running in background thread""" @@ -60,7 +63,7 @@ def _read_loop(self): buffer += text # Process complete lines (ending with \n or \r) - while '\n' in buffer or '\r' in buffer: + while self.running and ('\n' in buffer or '\r' in buffer): # Find the first line ending idx_n = buffer.find('\n') idx_r = buffer.find('\r') @@ -101,6 +104,8 @@ def _read_loop(self): else: break except serial.SerialException as e: + if self._muted: + break root = e.__cause__ or e.__context__ err_str = str(e).lower() if ( @@ -112,6 +117,8 @@ def _read_loop(self): self.error_occurred.emit(f"Serial port error: {e}") break except OSError as e: + if self._muted: + break if e.errno in ( 5, # EIO - Input/output error (Linux) 6, # ENXIO - No such device or address (macOS/Linux) @@ -124,11 +131,15 @@ def _read_loop(self): self.error_occurred.emit(f"Unexpected error: {e}") break except Exception as e: + if self._muted: + break self.error_occurred.emit(f"Unexpected error: {e}") break def _emit_line(self, line): """Emit a line with optional timestamp""" + if self._muted: + return # Remove ANSI codes for timestamp detection import re ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')