Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src-tauri/src/commands/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,12 @@ pub async fn reset_circuit_breaker(
.unwrap_or_else(|| provider_id.clone());

// 创建故障转移切换管理器并执行切换
let switch_manager =
crate::proxy::failover_switch::FailoverSwitchManager::new(db.clone());
let switch_manager = crate::proxy::failover_switch::FailoverSwitchManager::new(
db.clone(),
Some(app_handle),
);
if let Err(e) = switch_manager
.try_switch(Some(&app_handle), &app_type, &provider_id, &provider_name)
.try_switch(&app_type, &provider_id, &provider_name)
.await
{
log::error!("[Recovery] 自动切换失败: {e}");
Expand Down
11 changes: 6 additions & 5 deletions src-tauri/src/proxy/failover_switch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ pub struct FailoverSwitchManager {
/// 正在处理中的切换(key = "app_type:provider_id")
pending_switches: Arc<RwLock<HashSet<String>>>,
db: Arc<Database>,
/// AppHandle,用于 UI 更新和事件发射
app_handle: Option<Arc<tauri::AppHandle>>,
}

impl FailoverSwitchManager {
pub fn new(db: Arc<Database>) -> Self {
pub fn new(db: Arc<Database>, app_handle: Option<tauri::AppHandle>) -> Self {
Self {
pending_switches: Arc::new(RwLock::new(HashSet::new())),
db,
app_handle: app_handle.map(Arc::new),
}
}

Expand All @@ -40,7 +43,6 @@ impl FailoverSwitchManager {
/// - `Err(e)` - 切换过程中发生错误
pub async fn try_switch(
&self,
app_handle: Option<&tauri::AppHandle>,
app_type: &str,
provider_id: &str,
provider_name: &str,
Expand All @@ -59,7 +61,7 @@ impl FailoverSwitchManager {

// 执行切换(确保最后清理 pending 标记)
let result = self
.do_switch(app_handle, app_type, provider_id, provider_name)
.do_switch(app_type, provider_id, provider_name)
.await;

// 清理 pending 标记
Expand All @@ -73,7 +75,6 @@ impl FailoverSwitchManager {

async fn do_switch(
&self,
app_handle: Option<&tauri::AppHandle>,
app_type: &str,
provider_id: &str,
provider_name: &str,
Expand All @@ -97,7 +98,7 @@ impl FailoverSwitchManager {

let mut switched = false;

if let Some(app) = app_handle {
if let Some(app) = self.app_handle.as_ref().map(|a| a.as_ref()) {
if let Some(app_state) = app.try_state::<crate::store::AppState>() {
switched = app_state
.proxy_service
Expand Down
13 changes: 4 additions & 9 deletions src-tauri/src/proxy/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,14 @@ impl RequestForwarder {
if should_switch {
status.failover_count += 1;

// 异步触发供应商切换,更新 UI/托盘,并把当前供应商”同步为实际使用的 provider
// 异步触发供应商切换,更新 UI/托盘,并把当前供应商”同步为实际使用的 provider
let fm = self.failover_manager.clone();
let ah = self.app_handle.clone();
let pid = provider.id.clone();
let pname = provider.name.clone();
let at = app_type_str.to_string();

tokio::spawn(async move {
let _ = fm.try_switch(ah.as_ref(), &at, &pid, &pname).await;
let _ = fm.try_switch(&at, &pid, &pname).await;
});
}
// 重新计算成功率
Expand Down Expand Up @@ -362,14 +361,13 @@ impl RequestForwarder {

// 异步触发供应商切换,更新 UI/托盘
let fm = self.failover_manager.clone();
let ah = self.app_handle.clone();
let pid = provider.id.clone();
let pname = provider.name.clone();
let at = app_type_str.to_string();

tokio::spawn(async move {
let _ = fm
.try_switch(ah.as_ref(), &at, &pid, &pname)
.try_switch(&at, &pid, &pname)
.await;
});
}
Expand Down Expand Up @@ -556,14 +554,11 @@ impl RequestForwarder {
if should_switch {
status.failover_count += 1;
let fm = self.failover_manager.clone();
let ah = self.app_handle.clone();
let pid = provider.id.clone();
let pname = provider.name.clone();
let at = app_type_str.to_string();
tokio::spawn(async move {
let _ = fm
.try_switch(ah.as_ref(), &at, &pid, &pname)
.await;
let _ = fm.try_switch(&at, &pid, &pname).await;
});
}
if status.total_requests > 0 {
Expand Down
77 changes: 76 additions & 1 deletion src-tauri/src/proxy/provider_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use crate::app_config::AppType;
use crate::database::Database;
use crate::error::AppError;
use crate::provider::Provider;
use crate::proxy::circuit_breaker::{AllowResult, CircuitBreaker, CircuitBreakerConfig};
use crate::proxy::circuit_breaker::{AllowResult, CircuitBreaker, CircuitBreakerConfig, CircuitState};
use crate::proxy::failover_switch::FailoverSwitchManager;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
Expand All @@ -18,6 +19,8 @@ pub struct ProviderRouter {
db: Arc<Database>,
/// 熔断器管理器 - key 格式: "app_type:provider_id"
circuit_breakers: Arc<RwLock<HashMap<String, Arc<CircuitBreaker>>>>,
/// 自动回切管理器(由 ProxyServer 构造时注入)
failover_switch_manager: Arc<RwLock<Option<Arc<FailoverSwitchManager>>>>,
}

impl ProviderRouter {
Expand All @@ -26,9 +29,18 @@ impl ProviderRouter {
Self {
db,
circuit_breakers: Arc::new(RwLock::new(HashMap::new())),
failover_switch_manager: Arc::new(RwLock::new(None)),
}
}

/// 异步设置自动回切管理器
///
/// 等写入完成后返回,消除 fire-and-forget 竞态。
/// caller 必须 await 此方法后再让 proxy 处理请求。
pub async fn set_failover_switch_manager(&self, mgr: Arc<FailoverSwitchManager>) {
*self.failover_switch_manager.write().await = Some(mgr);
}

/// 选择可用的供应商(支持故障转移)
///
/// 返回按优先级排序的可用供应商列表:
Expand Down Expand Up @@ -143,6 +155,9 @@ impl ProviderRouter {

if success {
breaker.record_success(used_half_open_permit).await;

// [FO-BACK] 自动回切:当前用的是 P2+,且 P1 已从熔断恢复
self.trigger_failback_if_needed(provider_id, app_type).await;
} else {
breaker.record_failure(used_half_open_permit).await;
}
Expand Down Expand Up @@ -218,6 +233,66 @@ impl ProviderRouter {
}
}

/// 检查是否需要触发自动回切并在后台执行
///
/// 当当前使用 P2+,且 P1 熔断器已恢复为 Closed 时,
/// 异步触发切换到 P1。Failback 失败仅记录日志,不向上传播。
async fn trigger_failback_if_needed(&self, provider_id: &str, app_type: &str) {
let Some(ref mgr) = *self.failover_switch_manager.read().await else {
return;
};

let Ok(ordered_ids) = self.db.get_failover_queue(app_type) else {
log::warn!("[FO-BACK] 读取 failover_queue 失败,跳过本次回切");
return;
};
let Some(p1_id) = ordered_ids.first().map(|item| &item.provider_id) else {
return;
};

// 已在 P1,无需回切
if provider_id == p1_id {
return;
}

// 检查 P1 熔断器状态
let p1_circuit_key = format!("{app_type}:{p1_id}");
let p1_breaker = self.get_or_create_circuit_breaker(&p1_circuit_key).await;
if p1_breaker.get_state().await != CircuitState::Closed {
return;
}

let Ok(all) = self.db.get_all_providers(app_type) else {
log::warn!("[FO-BACK] 读取 providers 失败,跳过本次回切");
return;
};
let Some(p1) = all.get(p1_id) else {
return;
};

log::info!(
"[FO-BACK] P1 {} 恢复为 Closed(当前: {}),触发自动回切",
p1.name,
provider_id
);

let fm = mgr.clone();
let at = app_type.to_string();
let pid = p1.id.clone();
let pname = p1.name.clone();
tokio::spawn(async move {
match fm.try_switch(&at, &pid, &pname).await {
Ok(true) => {}
Ok(false) => {
log::debug!("[FO-BACK] 切换已跳过(可能在进行中或已就绪)");
}
Err(e) => {
log::error!("[FO-BACK] try_switch 失败: {}", e);
}
}
});
}

/// 获取或创建熔断器
async fn get_or_create_circuit_breaker(&self, key: &str) -> Arc<CircuitBreaker> {
// 先尝试读锁获取
Expand Down
2 changes: 1 addition & 1 deletion src-tauri/src/proxy/response_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ mod tests {
provider_router: Arc::new(ProviderRouter::new(db.clone())),
gemini_shadow: Arc::new(GeminiShadowStore::default()),
app_handle: None,
failover_manager: Arc::new(FailoverSwitchManager::new(db)),
failover_manager: Arc::new(FailoverSwitchManager::new(db, None)),
}
}

Expand Down
13 changes: 12 additions & 1 deletion src-tauri/src/proxy/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub struct ProxyServer {
shutdown_tx: Arc<RwLock<Option<oneshot::Sender<()>>>>,
/// 服务器任务句柄,用于等待服务器实际关闭
server_handle: Arc<RwLock<Option<JoinHandle<()>>>>,
/// failback manager 注入任务,等待完成后 proxy 才接受请求
pub settle_join: Option<JoinHandle<()>>,
}

impl ProxyServer {
Expand All @@ -62,7 +64,15 @@ impl ProxyServer {
// 创建共享的 ProviderRouter(熔断器状态将跨所有请求保持)
let provider_router = Arc::new(ProviderRouter::new(db.clone()));
// 创建故障转移切换管理器
let failover_manager = Arc::new(FailoverSwitchManager::new(db.clone()));
let failover_manager = Arc::new(FailoverSwitchManager::new(db.clone(), app_handle.clone()));
// [FO-BACK] 异步注入回切管理器,写入完成后 caller 应 await settle_join
let provider_router_clone = provider_router.clone();
let failover_manager_clone = failover_manager.clone();
let settle_join = tokio::spawn(async move {
provider_router_clone
.set_failover_switch_manager(failover_manager_clone)
.await;
});

let state = ProxyState {
db,
Expand All @@ -81,6 +91,7 @@ impl ProxyServer {
state,
shutdown_tx: Arc::new(RwLock::new(None)),
server_handle: Arc::new(RwLock::new(None)),
settle_join: Some(settle_join),
}
}

Expand Down
6 changes: 4 additions & 2 deletions src-tauri/src/services/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ impl ProxyService {

// 4. 创建并启动服务器
let app_handle = self.app_handle.read().await.clone();
let server = ProxyServer::new(config.clone(), self.db.clone(), app_handle);
let mut server = ProxyServer::new(config.clone(), self.db.clone(), app_handle);
server.settle_join.take().unwrap().await;
let info = server
.start()
.await
Expand Down Expand Up @@ -1881,7 +1882,8 @@ impl ProxyService {
}

let app_handle = self.app_handle.read().await.clone();
let new_server = ProxyServer::new(new_config, self.db.clone(), app_handle);
let mut new_server = ProxyServer::new(new_config.clone(), self.db.clone(), app_handle.clone());
new_server.settle_join.take().unwrap().await;
new_server
.start()
.await
Expand Down
Loading