Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 160 additions & 13 deletions src-tauri/src/services/session_usage.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! Claude Code 会话日志使用追踪
//!
//! 从 ~/.claude/projects/ 下的 JSONL 会话文件中提取 token 使用数据,
//! 实现无代理模式下的使用统计。
//! 实现无代理模式下的使用统计。同时解析子 Agent (sub-agent) 的对话记录。
//!
//! ## 数据流
//! ```text
//! ~/.claude/projects/*/*.jsonl → 增量解析 → 去重 → 费用计算 → proxy_request_logs 表
//! ~/.claude/projects/*/*.jsonl → 增量解析 → 去重 → 费用计算 → proxy_request_logs 表
//! ~/.claude/projects/*/<sessionId>/subagents/ → 增量解析 → 去重 → 费用计算 → proxy_request_logs 表
//! ```

use crate::config::get_claude_config_dir;
Expand Down Expand Up @@ -54,7 +55,7 @@ struct ParsedAssistantUsage {
session_id: Option<String>,
}

/// 同步 Claude Code 会话日志到使用统计数据库
/// 同步 Claude Code 会话日志到使用统计数据库(包括子 Agent 记录)
pub fn sync_claude_session_logs(db: &Database) -> Result<SessionSyncResult, AppError> {
let projects_dir = get_claude_config_dir().join("projects");
if !projects_dir.exists() {
Expand All @@ -73,13 +74,13 @@ pub fn sync_claude_session_logs(db: &Database) -> Result<SessionSyncResult, AppE
errors: vec![],
};

// 收集所有 .jsonl 文件
// 收集主会话 .jsonl 文件
let jsonl_files = collect_jsonl_files(&projects_dir);

for file_path in &jsonl_files {
result.files_scanned += 1;

match sync_single_file(db, file_path) {
match sync_single_file(db, file_path, "session_log", "session:") {
Ok((imported, skipped)) => {
result.imported += imported;
result.skipped += skipped;
Expand All @@ -92,12 +93,32 @@ pub fn sync_claude_session_logs(db: &Database) -> Result<SessionSyncResult, AppE
}
}

// 收集子 Agent 会话文件
let subagent_files = collect_subagent_files(&projects_dir);

for file_path in &subagent_files {
result.files_scanned += 1;

match sync_single_file(db, file_path, "session_subagent", "subagent:") {
Ok((imported, skipped)) => {
result.imported += imported;
result.skipped += skipped;
}
Err(e) => {
let msg = format!("{}: {e}", file_path.display());
log::warn!("[SESSION-SYNC] 子Agent文件解析失败: {msg}");
result.errors.push(msg);
}
}
}

if result.imported > 0 {
log::info!(
"[SESSION-SYNC] 同步完成: 导入 {} 条, 跳过 {} 条, 扫描 {} 个文件",
"[SESSION-SYNC] 同步完成: 导入 {} 条, 跳过 {} 条, 扫描 {} 个文件 (含 {} 个子Agent文件)",
result.imported,
result.skipped,
result.files_scanned
result.files_scanned,
subagent_files.len()
);
}

Expand Down Expand Up @@ -132,8 +153,87 @@ fn collect_jsonl_files(projects_dir: &Path) -> Vec<PathBuf> {
files
}

/// 收集所有子 Agent 的会话文件
///
/// 扫描路径: ~/.claude/projects/<project>/<sessionId>/subagents/**/agent-*.jsonl
fn collect_subagent_files(projects_dir: &Path) -> Vec<PathBuf> {
let mut files = Vec::new();

let entries = match fs::read_dir(projects_dir) {
Ok(e) => e,
Err(_) => return files,
};

for entry in entries.flatten() {
let project_dir = entry.path();
if !project_dir.is_dir() {
continue;
}

// 遍历项目目录下的每个子目录 (可能是 session 目录)
let session_entries = match fs::read_dir(&project_dir) {
Ok(e) => e,
Err(_) => continue,
};

for session_entry in session_entries.flatten() {
let session_path = session_entry.path();
if !session_path.is_dir() {
continue;
}

let subagents_dir = session_path.join("subagents");
if !subagents_dir.is_dir() {
continue;
}

collect_subagent_files_recursive(&subagents_dir, &mut files);
}
}

files
}

/// 递归扫描 subagents 目录下任意层级的 agent-*.jsonl。
///
/// Claude Code/工作流可能把子 Agent 日志放在 subagents/workflows/<runId>/ 等更深层目录,
/// 因此这里不限制递归深度,只在文件名层面过滤实际需要导入的 transcript。
fn collect_subagent_files_recursive(dir: &Path, files: &mut Vec<PathBuf>) {
let entries = match fs::read_dir(dir) {
Ok(e) => e,
Err(_) => return,
};

for entry in entries.flatten() {
let file_type = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
let path = entry.path();

if file_type.is_dir() {
collect_subagent_files_recursive(&path, files);
} else if file_type.is_file() && is_subagent_jsonl_file(&path) {
files.push(path);
}
}
}

fn is_subagent_jsonl_file(path: &Path) -> bool {
let file_name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
file_name.starts_with("agent-") && path.extension().and_then(|e| e.to_str()) == Some("jsonl")
}

/// 同步单个 JSONL 文件,返回 (imported, skipped)
fn sync_single_file(db: &Database, file_path: &Path) -> Result<(u32, u32), AppError> {
///
/// `data_source` 用于区分数据来源 ("session_log" 或 "session_subagent")
/// `request_id_prefix` 用于生成唯一的 request_id 前缀
fn sync_single_file(
db: &Database,
file_path: &Path,
data_source: &str,
request_id_prefix: &str,
) -> Result<(u32, u32), AppError> {
let file_path_str = file_path.to_string_lossy().to_string();

// 获取文件元数据
Expand Down Expand Up @@ -278,14 +378,14 @@ fn sync_single_file(db: &Database, file_path: &Path) -> Result<(u32, u32), AppEr
continue;
}

let request_id = format!("session:{}", msg.message_id);
let request_id = format!("{}{}", request_id_prefix, msg.message_id);

// 跳过 output_tokens 为 0 的无意义条目
if msg.output_tokens == 0 {
continue;
}

match insert_session_log_entry(db, &request_id, msg) {
match insert_session_log_entry(db, &request_id, msg, data_source) {
Ok(true) => imported += 1,
Ok(false) => skipped += 1,
Err(e) => {
Expand Down Expand Up @@ -335,10 +435,13 @@ fn update_sync_state(
}

/// 插入单条会话日志到 proxy_request_logs,返回是否成功插入 (true=新插入, false=已存在)
///
/// `data_source` 区分主会话 ("session_log") 和子 Agent ("session_subagent")
fn insert_session_log_entry(
db: &Database,
request_id: &str,
msg: &ParsedAssistantUsage,
data_source: &str,
) -> Result<bool, AppError> {
let conn = lock_conn!(db.conn);

Expand Down Expand Up @@ -432,11 +535,11 @@ fn insert_session_log_entry(
200i64, // status_code: 有 stop_reason 说明请求成功
Option::<String>::None, // error_message
msg.session_id,
Some("session_log"), // provider_type
Some(data_source), // provider_type: 使用 data_source 值
1i64, // is_streaming: Claude Code 通常使用流式
"1.0", // cost_multiplier
created_at,
"session_log", // data_source
data_source, // data_source: 区分主会话和子Agent
],
)
.map_err(|e| AppError::Database(format!("插入会话日志失败: {e}")))?;
Expand Down Expand Up @@ -527,7 +630,11 @@ pub fn get_data_source_breakdown(db: &Database) -> Result<Vec<DataSourceSummary>
let conn = lock_conn!(db.conn);

let mut stmt = conn.prepare(
"SELECT COALESCE(data_source, 'proxy') as ds, COUNT(*) as cnt,
"SELECT CASE COALESCE(data_source, 'proxy')
WHEN 'session_subagent' THEN 'session_log'
ELSE COALESCE(data_source, 'proxy')
END as ds,
COUNT(*) as cnt,
COALESCE(SUM(CAST(total_cost_usd AS REAL)), 0) as cost
FROM proxy_request_logs
GROUP BY ds
Expand All @@ -553,6 +660,7 @@ pub fn get_data_source_breakdown(db: &Database) -> Result<Vec<DataSourceSummary>
#[cfg(test)]
mod tests {
use super::*;
use std::fs;

#[test]
fn test_parse_usage_from_jsonl_line() {
Expand Down Expand Up @@ -631,4 +739,43 @@ mod tests {
messages.insert("msg_1".to_string(), final_entry);
assert_eq!(messages.get("msg_1").unwrap().output_tokens, 1349);
}

#[test]
fn test_collect_subagent_files_recurses_nested_workflow_dirs() {
let temp = tempfile::tempdir().unwrap();
let subagents_dir = temp
.path()
.join("project-a")
.join("session-1")
.join("subagents");
let nested_dir = subagents_dir.join("workflows").join("run-1");

fs::create_dir_all(&nested_dir).unwrap();
fs::write(subagents_dir.join("agent-root.jsonl"), "").unwrap();
fs::write(nested_dir.join("agent-nested.jsonl"), "").unwrap();
fs::write(nested_dir.join("not-agent.jsonl"), "").unwrap();
fs::write(subagents_dir.join("agent-not-json.txt"), "").unwrap();

let mut files: Vec<PathBuf> = collect_subagent_files(temp.path())
.into_iter()
.map(|path| path.strip_prefix(temp.path()).unwrap().to_path_buf())
.collect();
files.sort();

let mut expected = vec![
PathBuf::from("project-a")
.join("session-1")
.join("subagents")
.join("agent-root.jsonl"),
PathBuf::from("project-a")
.join("session-1")
.join("subagents")
.join("workflows")
.join("run-1")
.join("agent-nested.jsonl"),
];
expected.sort();

assert_eq!(files, expected);
}
}
19 changes: 17 additions & 2 deletions src-tauri/src/services/usage_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ pub struct RequestLogDetail {
/// SQL fragment: resolve provider_name with fallback for session-based entries.
/// Session logs use placeholder provider_ids (_session, _codex_session, _gemini_session)
/// that don't exist in the providers table — this COALESCE gives them readable names.
///
/// This helper is shared by raw log and daily rollup queries. Keep it independent from
/// proxy_request_logs-only columns such as data_source; usage_daily_rollups does not store them.
fn provider_name_coalesce(log_alias: &str, provider_alias: &str) -> String {
format!(
"COALESCE({provider_alias}.name, CASE {log_alias}.provider_id \
Expand All @@ -130,6 +133,16 @@ fn provider_name_coalesce(log_alias: &str, provider_alias: &str) -> String {
)
}

/// UI 展示层不单独拆分 Claude 主会话和子 Agent,会统一显示为 session_log。
/// 数据库仍保留原始 data_source,便于后续排查或重新拆分。
fn display_data_source_expr(log_alias: &str) -> String {
format!(
"CASE COALESCE({log_alias}.data_source, 'proxy') \
WHEN 'session_subagent' THEN 'session_log' \
ELSE COALESCE({log_alias}.data_source, 'proxy') END"
)
}

impl Database {
/// 获取使用量汇总
pub fn get_usage_summary(
Expand Down Expand Up @@ -671,13 +684,14 @@ impl Database {
params.push(Box::new(offset as i64));

let logs_pname = provider_name_coalesce("l", "p");
let logs_data_source = display_data_source_expr("l");
let sql = format!(
"SELECT l.request_id, l.provider_id, {logs_pname} as provider_name, l.app_type, l.model,
l.request_model, l.cost_multiplier,
l.input_tokens, l.output_tokens, l.cache_read_tokens, l.cache_creation_tokens,
l.input_cost_usd, l.output_cost_usd, l.cache_read_cost_usd, l.cache_creation_cost_usd, l.total_cost_usd,
l.is_streaming, l.latency_ms, l.first_token_ms, l.duration_ms,
l.status_code, l.error_message, l.created_at, l.data_source
l.status_code, l.error_message, l.created_at, {logs_data_source} as data_source
FROM proxy_request_logs l
LEFT JOIN providers p ON l.provider_id = p.id AND l.app_type = p.app_type
{where_clause}
Expand Down Expand Up @@ -749,13 +763,14 @@ impl Database {
let conn = lock_conn!(self.conn);

let detail_pname = provider_name_coalesce("l", "p");
let detail_data_source = display_data_source_expr("l");
let detail_sql = format!(
"SELECT l.request_id, l.provider_id, {detail_pname} as provider_name, l.app_type, l.model,
l.request_model, l.cost_multiplier,
input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens,
input_cost_usd, output_cost_usd, cache_read_cost_usd, cache_creation_cost_usd, total_cost_usd,
is_streaming, latency_ms, first_token_ms, duration_ms,
status_code, error_message, created_at, l.data_source
status_code, error_message, created_at, {detail_data_source} as data_source
FROM proxy_request_logs l
LEFT JOIN providers p ON l.provider_id = p.id AND l.app_type = p.app_type
WHERE l.request_id = ?"
Expand Down
Loading