Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src-tauri/src/commands/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,12 @@ pub async fn reset_circuit_breaker(
.unwrap_or_else(|| provider_id.clone());

// 创建故障转移切换管理器并执行切换
let switch_manager =
crate::proxy::failover_switch::FailoverSwitchManager::new(db.clone());
let switch_manager = crate::proxy::failover_switch::FailoverSwitchManager::new(
db.clone(),
Some(app_handle),
);
if let Err(e) = switch_manager
.try_switch(Some(&app_handle), &app_type, &provider_id, &provider_name)
.try_switch(&app_type, &provider_id, &provider_name)
.await
{
log::error!("[Recovery] 自动切换失败: {e}");
Expand Down
17 changes: 16 additions & 1 deletion src-tauri/src/proxy/circuit_breaker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use super::log_codes::cb as log_cb;
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -77,6 +77,8 @@ pub struct CircuitBreaker {
config: Arc<RwLock<CircuitBreakerConfig>>,
/// 半开状态已放行的请求数(用于限流)
half_open_requests: Arc<AtomicU32>,
/// 是否曾经进入过 Open(用于判断"真正恢复"而非"从未失败")
ever_tripped: Arc<AtomicBool>,
}

/// 熔断器放行结果
Expand All @@ -101,6 +103,7 @@ impl CircuitBreaker {
last_opened_at: Arc::new(RwLock::new(None)),
config: Arc::new(RwLock::new(config)),
half_open_requests: Arc::new(AtomicU32::new(0)),
ever_tripped: Arc::new(AtomicBool::new(false)),
}
}

Expand Down Expand Up @@ -348,6 +351,7 @@ impl CircuitBreaker {
*self.last_opened_at.write().await = Some(Instant::now());
self.consecutive_failures.store(0, Ordering::SeqCst);
self.consecutive_successes.store(0, Ordering::SeqCst);
self.ever_tripped.store(true, Ordering::SeqCst);
}

/// 转换到半开状态
Expand All @@ -371,6 +375,17 @@ impl CircuitBreaker {
// 重置计数器
self.total_requests.store(0, Ordering::SeqCst);
self.failed_requests.store(0, Ordering::SeqCst);
// 手动重置时清 ever_tripped(新周期开始)
self.ever_tripped.store(false, Ordering::SeqCst);
}

/// 是否曾经进入过 Open 状态
///
/// 用于判断"真正从故障中恢复"而非"从未失败过的正常 Closed"。
/// 当 P1 刚从 Open/HalfOpen 恢复到 Closed 时返回 true,
/// P1 正常运行未触发熔断时返回 false。
pub fn has_been_open(&self) -> bool {
self.ever_tripped.load(Ordering::SeqCst)
}
}

Expand Down
11 changes: 6 additions & 5 deletions src-tauri/src/proxy/failover_switch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ pub struct FailoverSwitchManager {
/// 正在处理中的切换(key = "app_type:provider_id")
pending_switches: Arc<RwLock<HashSet<String>>>,
db: Arc<Database>,
/// AppHandle,用于 UI 更新和事件发射
app_handle: Option<Arc<tauri::AppHandle>>,
}

impl FailoverSwitchManager {
pub fn new(db: Arc<Database>) -> Self {
pub fn new(db: Arc<Database>, app_handle: Option<tauri::AppHandle>) -> Self {
Self {
pending_switches: Arc::new(RwLock::new(HashSet::new())),
db,
app_handle: app_handle.map(Arc::new),
}
}

Expand All @@ -40,7 +43,6 @@ impl FailoverSwitchManager {
/// - `Err(e)` - 切换过程中发生错误
pub async fn try_switch(
&self,
app_handle: Option<&tauri::AppHandle>,
app_type: &str,
provider_id: &str,
provider_name: &str,
Expand All @@ -59,7 +61,7 @@ impl FailoverSwitchManager {

// 执行切换(确保最后清理 pending 标记)
let result = self
.do_switch(app_handle, app_type, provider_id, provider_name)
.do_switch(app_type, provider_id, provider_name)
.await;

// 清理 pending 标记
Expand All @@ -73,7 +75,6 @@ impl FailoverSwitchManager {

async fn do_switch(
&self,
app_handle: Option<&tauri::AppHandle>,
app_type: &str,
provider_id: &str,
provider_name: &str,
Expand All @@ -97,7 +98,7 @@ impl FailoverSwitchManager {

let mut switched = false;

if let Some(app) = app_handle {
if let Some(app) = self.app_handle.as_ref().map(|a| a.as_ref()) {
if let Some(app_state) = app.try_state::<crate::store::AppState>() {
switched = app_state
.proxy_service
Expand Down
13 changes: 4 additions & 9 deletions src-tauri/src/proxy/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,14 @@ impl RequestForwarder {
if should_switch {
status.failover_count += 1;

// 异步触发供应商切换,更新 UI/托盘,并把当前供应商”同步为实际使用的 provider
// 异步触发供应商切换,更新 UI/托盘,并把当前供应商”同步为实际使用的 provider
let fm = self.failover_manager.clone();
let ah = self.app_handle.clone();
let pid = provider.id.clone();
let pname = provider.name.clone();
let at = app_type_str.to_string();

tokio::spawn(async move {
let _ = fm.try_switch(ah.as_ref(), &at, &pid, &pname).await;
let _ = fm.try_switch(&at, &pid, &pname).await;
});
}
// 重新计算成功率
Expand Down Expand Up @@ -362,14 +361,13 @@ impl RequestForwarder {

// 异步触发供应商切换,更新 UI/托盘
let fm = self.failover_manager.clone();
let ah = self.app_handle.clone();
let pid = provider.id.clone();
let pname = provider.name.clone();
let at = app_type_str.to_string();

tokio::spawn(async move {
let _ = fm
.try_switch(ah.as_ref(), &at, &pid, &pname)
.try_switch(&at, &pid, &pname)
.await;
});
}
Expand Down Expand Up @@ -556,14 +554,11 @@ impl RequestForwarder {
if should_switch {
status.failover_count += 1;
let fm = self.failover_manager.clone();
let ah = self.app_handle.clone();
let pid = provider.id.clone();
let pname = provider.name.clone();
let at = app_type_str.to_string();
tokio::spawn(async move {
let _ = fm
.try_switch(ah.as_ref(), &at, &pid, &pname)
.await;
let _ = fm.try_switch(&at, &pid, &pname).await;
});
}
if status.total_requests > 0 {
Expand Down
90 changes: 89 additions & 1 deletion src-tauri/src/proxy/provider_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use crate::app_config::AppType;
use crate::database::Database;
use crate::error::AppError;
use crate::provider::Provider;
use crate::proxy::circuit_breaker::{AllowResult, CircuitBreaker, CircuitBreakerConfig};
use crate::proxy::circuit_breaker::{AllowResult, CircuitBreaker, CircuitBreakerConfig, CircuitState};
use crate::proxy::failover_switch::FailoverSwitchManager;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
Expand All @@ -18,6 +19,8 @@ pub struct ProviderRouter {
db: Arc<Database>,
/// 熔断器管理器 - key 格式: "app_type:provider_id"
circuit_breakers: Arc<RwLock<HashMap<String, Arc<CircuitBreaker>>>>,
/// 自动回切管理器(由 ProxyServer 构造时注入)
failover_switch_manager: Arc<RwLock<Option<Arc<FailoverSwitchManager>>>>,
}

impl ProviderRouter {
Expand All @@ -26,9 +29,18 @@ impl ProviderRouter {
Self {
db,
circuit_breakers: Arc::new(RwLock::new(HashMap::new())),
failover_switch_manager: Arc::new(RwLock::new(None)),
}
}

/// 异步设置自动回切管理器
///
/// 等写入完成后返回,消除 fire-and-forget 竞态。
/// caller 必须 await 此方法后再让 proxy 处理请求。
pub async fn set_failover_switch_manager(&self, mgr: Arc<FailoverSwitchManager>) {
*self.failover_switch_manager.write().await = Some(mgr);
}

/// 选择可用的供应商(支持故障转移)
///
/// 返回按优先级排序的可用供应商列表:
Expand Down Expand Up @@ -143,6 +155,9 @@ impl ProviderRouter {

if success {
breaker.record_success(used_half_open_permit).await;

// [FO-BACK] 自动回切:当前用的是 P2+,且 P1 已从熔断恢复
self.trigger_failback_if_needed(provider_id, app_type).await;
} else {
breaker.record_failure(used_half_open_permit).await;
}
Expand Down Expand Up @@ -218,6 +233,79 @@ impl ProviderRouter {
}
}

/// 检查是否需要触发自动回切并在后台执行
///
/// 当当前使用 P2+,且 P1 熔断器已恢复为 Closed 时,
/// 异步触发切换到 P1。Failback 失败仅记录日志,不向上传播。
async fn trigger_failback_if_needed(&self, provider_id: &str, app_type: &str) {
let Some(ref mgr) = *self.failover_switch_manager.read().await else {
return;
};

// [P1-A] 故障转移总开关关闭时跳过回切
let auto_failover_enabled = match self.db.get_proxy_config_for_app(app_type).await {
Ok(config) => config.auto_failover_enabled,
Err(_) => {
log::warn!("[FO-BACK] 读取 proxy_config 失败,跳过本次回切");
return;
}
};
if !auto_failover_enabled {
return;
}

let Ok(ordered_ids) = self.db.get_failover_queue(app_type) else {
log::warn!("[FO-BACK] 读取 failover_queue 失败,跳过本次回切");
return;
};
let Some(p1_id) = ordered_ids.first().map(|item| &item.provider_id) else {
return;
};

// 已在 P1,无需回切
if provider_id == p1_id {
return;
}

// 检查 P1 熔断器状态
let p1_circuit_key = format!("{app_type}:{p1_id}");
let p1_breaker = self.get_or_create_circuit_breaker(&p1_circuit_key).await;
// [P1-B] 必须同时满足:P1 从 Open/HalfOpen 恢复到 Closed(而非从未失败过)
if p1_breaker.get_state().await != CircuitState::Closed || !p1_breaker.has_been_open() {
return;
}

let Ok(all) = self.db.get_all_providers(app_type) else {
log::warn!("[FO-BACK] 读取 providers 失败,跳过本次回切");
return;
};
let Some(p1) = all.get(p1_id) else {
return;
};

log::info!(
"[FO-BACK] P1 {} 恢复为 Closed(当前: {}),触发自动回切",
p1.name,
provider_id
);

let fm = mgr.clone();
let at = app_type.to_string();
let pid = p1.id.clone();
let pname = p1.name.clone();
tokio::spawn(async move {
match fm.try_switch(&at, &pid, &pname).await {
Ok(true) => {}
Ok(false) => {
log::debug!("[FO-BACK] 切换已跳过(可能在进行中或已就绪)");
}
Err(e) => {
log::error!("[FO-BACK] try_switch 失败: {}", e);
}
}
});
}

/// 获取或创建熔断器
async fn get_or_create_circuit_breaker(&self, key: &str) -> Arc<CircuitBreaker> {
// 先尝试读锁获取
Expand Down
2 changes: 1 addition & 1 deletion src-tauri/src/proxy/response_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ mod tests {
provider_router: Arc::new(ProviderRouter::new(db.clone())),
gemini_shadow: Arc::new(GeminiShadowStore::default()),
app_handle: None,
failover_manager: Arc::new(FailoverSwitchManager::new(db)),
failover_manager: Arc::new(FailoverSwitchManager::new(db, None)),
}
}

Expand Down
13 changes: 12 additions & 1 deletion src-tauri/src/proxy/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub struct ProxyServer {
shutdown_tx: Arc<RwLock<Option<oneshot::Sender<()>>>>,
/// 服务器任务句柄,用于等待服务器实际关闭
server_handle: Arc<RwLock<Option<JoinHandle<()>>>>,
/// failback manager 注入任务,等待完成后 proxy 才接受请求
pub settle_join: Option<JoinHandle<()>>,
}

impl ProxyServer {
Expand All @@ -62,7 +64,15 @@ impl ProxyServer {
// 创建共享的 ProviderRouter(熔断器状态将跨所有请求保持)
let provider_router = Arc::new(ProviderRouter::new(db.clone()));
// 创建故障转移切换管理器
let failover_manager = Arc::new(FailoverSwitchManager::new(db.clone()));
let failover_manager = Arc::new(FailoverSwitchManager::new(db.clone(), app_handle.clone()));
// [FO-BACK] 异步注入回切管理器,写入完成后 caller 应 await settle_join
let provider_router_clone = provider_router.clone();
let failover_manager_clone = failover_manager.clone();
let settle_join = tokio::spawn(async move {
provider_router_clone
.set_failover_switch_manager(failover_manager_clone)
.await;
});

let state = ProxyState {
db,
Expand All @@ -81,6 +91,7 @@ impl ProxyServer {
state,
shutdown_tx: Arc::new(RwLock::new(None)),
server_handle: Arc::new(RwLock::new(None)),
settle_join: Some(settle_join),
}
}

Expand Down
6 changes: 4 additions & 2 deletions src-tauri/src/services/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ impl ProxyService {

// 4. 创建并启动服务器
let app_handle = self.app_handle.read().await.clone();
let server = ProxyServer::new(config.clone(), self.db.clone(), app_handle);
let mut server = ProxyServer::new(config.clone(), self.db.clone(), app_handle);
server.settle_join.take().unwrap().await;
let info = server
.start()
.await
Expand Down Expand Up @@ -1881,7 +1882,8 @@ impl ProxyService {
}

let app_handle = self.app_handle.read().await.clone();
let new_server = ProxyServer::new(new_config, self.db.clone(), app_handle);
let mut new_server = ProxyServer::new(new_config.clone(), self.db.clone(), app_handle.clone());
new_server.settle_join.take().unwrap().await;
new_server
.start()
.await
Expand Down