Skip to content
273 changes: 271 additions & 2 deletions esp_flasher/gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,244 @@
from PyQt6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QHBoxLayout, QPushButton, QLabel, QComboBox,
QFileDialog, QTextEdit, QGroupBox, QGridLayout,
QLineEdit)
QLineEdit, QDialog, QListWidget, QListWidgetItem,
QProgressBar, QMessageBox)
from PyQt6.QtGui import QColor, QPalette
from PyQt6.QtCore import pyqtSignal, QObject, Qt, QSettings
from PyQt6.QtCore import pyqtSignal, QObject, Qt, QSettings, QTimer

from esp_flasher.own_esptool import get_port_list, colorize, COLOR_RED, COLOR_GREEN, COLOR_CYAN, COLOR_YELLOW
from esp_flasher.const import (__version__, DEFAULT_WINDOW_WIDTH,
DEFAULT_WINDOW_HEIGHT, DEFAULT_WINDOW_X,
DEFAULT_WINDOW_Y)
from esp_flasher.console_color import ColoredConsole

class ImprovDialog(QDialog):
"""Dialog for Improv WiFi provisioning."""

def __init__(self, serial_port, parent=None):
super().__init__(parent)
self.setWindowTitle("Improv WiFi Provisioning")
self.setMinimumWidth(420)
self._serial_port = serial_port # reuse already-open port (never close it)
self._improv = None
self._is_provisioning = False # True only after we send credentials
self._init_ui()
self._start_improv()

def _init_ui(self):
layout = QVBoxLayout()

# Device info
self.info_label = QLabel("Detecting Improv device...")
layout.addWidget(self.info_label)

# WiFi network list
net_group = QGroupBox("WiFi Networks")
net_layout = QVBoxLayout()
self.network_list = QListWidget()
self.network_list.itemDoubleClicked.connect(self._on_network_selected)
net_layout.addWidget(self.network_list)

scan_btn = QPushButton("Scan Networks")
scan_btn.clicked.connect(self._scan_networks)
net_layout.addWidget(scan_btn)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
net_group.setLayout(net_layout)
layout.addWidget(net_group)

# Credentials
cred_group = QGroupBox("WiFi Credentials")
cred_layout = QGridLayout()
cred_layout.addWidget(QLabel("SSID:"), 0, 0)
self.ssid_input = QLineEdit()
cred_layout.addWidget(self.ssid_input, 0, 1)
cred_layout.addWidget(QLabel("Password:"), 1, 0)
self.password_input = QLineEdit()
self.password_input.setEchoMode(QLineEdit.EchoMode.Password)
cred_layout.addWidget(self.password_input, 1, 1)
cred_group.setLayout(cred_layout)
layout.addWidget(cred_group)

# Status / progress
self.status_label = QLabel("")
layout.addWidget(self.status_label)
self.progress = QProgressBar()
self.progress.setRange(0, 0) # indeterminate
self.progress.setVisible(False)
layout.addWidget(self.progress)

# Buttons
btn_layout = QHBoxLayout()
self.provision_btn = QPushButton("Provision")
self.provision_btn.clicked.connect(self._provision)
self.provision_btn.setEnabled(False)
btn_layout.addWidget(self.provision_btn)

close_btn = QPushButton("Close")
close_btn.clicked.connect(self.close)
btn_layout.addWidget(close_btn)
layout.addLayout(btn_layout)

self.setLayout(layout)

def _start_improv(self):
"""Start Improv on the already-open serial port (same as JS: port stays open)."""
# Flush stale console data from buffer
self._serial_port.reset_input_buffer()

from esp_flasher.improv import ImprovManager
self._improv = ImprovManager(self._serial_port)
self._improv.state_changed.connect(self._on_state_changed)
self._improv.error_received.connect(self._on_error)
self._improv.device_info_received.connect(self._on_device_info)
self._improv.log_message.connect(self._on_log)
self._improv.provisioned.connect(self._on_provisioned)
self._improv.start()

# Periodically poll for device until it responds
self._detect_attempts = 0
self._detect_timer = QTimer(self)
self._detect_timer.timeout.connect(self._poll_device_state)
self._detect_timer.start(1000)
# Also send first request immediately
QTimer.singleShot(100, self._poll_device_state)

def _poll_device_state(self):
"""Send request_current_state until device responds or we give up."""
if not self._improv:
self._detect_timer.stop()
return
if self._improv.device_state is not None:
# Device has responded, stop polling
self._detect_timer.stop()
return
self._detect_attempts += 1
if self._detect_attempts > 15:
self._detect_timer.stop()
self.status_label.setText("No Improv device detected (timeout)")
return
self.status_label.setText(
f"Detecting Improv device... (attempt {self._detect_attempts}/15)"
)
self._improv.request_current_state()

def _on_state_changed(self, state):
from esp_flasher.improv import STATE_READY, STATE_PROVISIONING, STATE_PROVISIONED, STATE_NAMES
name = STATE_NAMES.get(state, f"Unknown ({state})")
self.progress.setVisible(False)
if state == STATE_READY:
self.status_label.setText(f"State: {name}")
self.provision_btn.setEnabled(True)
# Auto-request device info
threading.Thread(target=self._request_info_bg, daemon=True).start()
elif state == STATE_PROVISIONING:
self.status_label.setText("Connecting to WiFi...")
self.provision_btn.setEnabled(False)
self.progress.setVisible(True)
elif state == STATE_PROVISIONED:
if self._is_provisioning:
self.status_label.setText("✓ WiFi provisioned successfully!")
self.provision_btn.setEnabled(False)
else:
self.status_label.setText("Device already connected to WiFi")
self.provision_btn.setEnabled(True)
# Request device info in both cases
threading.Thread(target=self._request_info_bg, daemon=True).start()
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

def _on_error(self, error):
from esp_flasher.improv import ERROR_NAMES, ERROR_NONE
if error != ERROR_NONE:
self.status_label.setText(f"Error: {ERROR_NAMES.get(error, 'Unknown')}")
self.progress.setVisible(False)
self.provision_btn.setEnabled(True)

def _on_device_info(self, info):
parts = []
labels = ["Firmware", "Version", "Chip", "Name"]
for i, val in enumerate(info):
if val:
label = labels[i] if i < len(labels) else f"Field{i}"
parts.append(f"{label}: {val}")
self.info_label.setText(" | ".join(parts) if parts else "Device detected")

def _on_log(self, msg):
self.status_label.setText(msg)

def _on_provisioned(self, result):
self.progress.setVisible(False)
self.provision_btn.setEnabled(False)
msg = "✓ WiFi provisioned successfully!"
if result:
msg += f"\n{', '.join(result)}"
self.status_label.setText(msg)

def _on_network_selected(self, item):
ssid = item.data(Qt.ItemDataRole.UserRole)
if ssid:
self.ssid_input.setText(ssid)
self.password_input.setFocus()

def _scan_networks(self):
self.network_list.clear()
self.status_label.setText("Scanning WiFi networks...")
self.progress.setVisible(True)
threading.Thread(target=self._scan_bg, daemon=True).start()

def _scan_bg(self):
networks = self._improv.request_wifi_networks()
# Sort by RSSI descending
networks.sort(key=lambda n: n[1], reverse=True)
# Update UI from main thread via signal
QTimer.singleShot(0, lambda: self._update_network_list(networks))

def _update_network_list(self, networks):
self.network_list.clear()
self.progress.setVisible(False)
if not networks:
self.status_label.setText("No networks found")
return
for ssid, rssi, secured in networks:
lock = "🔒 " if secured else " "
item = QListWidgetItem(f"{lock}{ssid} ({rssi} dBm)")
item.setData(Qt.ItemDataRole.UserRole, ssid)
self.network_list.addItem(item)
self.status_label.setText(f"Found {len(networks)} networks")

def _request_info_bg(self):
self._improv.request_device_info()

def _provision(self):
ssid = self.ssid_input.text().strip()
password = self.password_input.text()
if not ssid:
self.status_label.setText("Please enter an SSID")
return
self._is_provisioning = True
self.provision_btn.setEnabled(False)
self.progress.setVisible(True)
self.status_label.setText(f"Provisioning WiFi: {ssid}...")
threading.Thread(target=self._provision_bg, args=(ssid, password), daemon=True).start()

def _provision_bg(self, ssid, password):
result = self._improv.send_wifi_settings(ssid, password)
if result is None:
QTimer.singleShot(0, lambda: self._provision_failed())

def _provision_failed(self):
self._is_provisioning = False
self.progress.setVisible(False)
self.provision_btn.setEnabled(True)
self.status_label.setText("WiFi provisioning failed")

def closeEvent(self, event):
if hasattr(self, '_detect_timer') and self._detect_timer.isActive():
self._detect_timer.stop()
if self._improv:
self._improv.stop()
self._improv = None
# Do NOT close the serial port — caller owns it and will restart console reader
super().closeEvent(event)


class FlashingThread(threading.Thread):
def __init__(self, firmware, port, finished=None, failed=None):
threading.Thread.__init__(self)
Expand Down Expand Up @@ -118,6 +346,9 @@ def init_ui(self):
self.flash_button = QPushButton("Flash ESP")
self.flash_button.clicked.connect(self.flash_esp)
actions_layout.addWidget(self.flash_button)
self.improv_button = QPushButton("Improv WiFi")
self.improv_button.clicked.connect(self.open_improv)
actions_layout.addWidget(self.improv_button)
actions_group_box.setLayout(actions_layout)

# Console with input field
Expand Down Expand Up @@ -321,6 +552,44 @@ def _start_flash_worker(self):
)
self._flash_worker.start()

def open_improv(self):
"""Open Improv WiFi provisioning dialog.
Matches JS: stop console reader, pass open port to Improv, restart reader on close."""
if self._is_flashing:
self.show_log_error("Cannot use Improv while flashing")
return
if not self._serial_port or not self._serial_port.is_open:
self.show_log_error("Connect to a serial port first")
return

# Stop console reader — disconnect signals first, then stop thread
if self._serial_reader:
self._serial_reader.line_received.disconnect(self.append_log_line)
self._serial_reader.error_occurred.disconnect(self.handle_serial_error)
self._serial_reader.stop()
# Wait until thread is truly dead
if self._serial_reader.thread and self._serial_reader.thread.is_alive():
self._serial_reader.thread.join(timeout=3.0)
self._serial_reader = None

# Disable console input while in Improv mode
self.input_field.setEnabled(False)
self.send_button.setEnabled(False)

# Open Improv dialog with the same open port (port stays open, no ESP reset)
dlg = ImprovDialog(self._serial_port, parent=self)
dlg.exec()

# Restart console reader on the same open port (like JS reconnectConsole)
if self._serial_port and self._serial_port.is_open:
from esp_flasher.serial_console import SerialReader
self._serial_reader = SerialReader(self._serial_port)
self._serial_reader.line_received.connect(self.append_log_line)
self._serial_reader.error_occurred.connect(self.handle_serial_error)
self._serial_reader.start()
self.input_field.setEnabled(True)
self.send_button.setEnabled(True)

def on_flash_finished(self):
"""Called when flashing is complete"""
print(colorize("\nFlashing complete!", COLOR_GREEN))
Expand Down
Loading