Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 221 additions & 20 deletions crates/tui/src/runtime_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,18 @@ pub async fn run_http_server(
config.default_text_model.clone(),
Some(options.workers),
);
let sessions_dir = default_sessions_dir().unwrap_or_else(|_| {
task_cfg
.data_dir
.parent()
.unwrap_or(&task_cfg.data_dir)
.join("sessions")
});
let runtime_threads = Arc::new(RuntimeThreadManager::open(
config.clone(),
workspace.clone(),
RuntimeThreadManagerConfig::from_task_data_dir(task_cfg.data_dir.clone()),
RuntimeThreadManagerConfig::from_task_data_dir(task_cfg.data_dir.clone())
.with_sessions_dir(sessions_dir.clone()),
)?);
let task_manager =
TaskManager::start_with_runtime_manager(task_cfg, config.clone(), runtime_threads.clone())
Expand Down Expand Up @@ -940,6 +948,16 @@ async fn resume_session_thread(
.await
.map_err(|e| ApiError::internal(format!("Failed to seed thread history: {e}")))?;

// Link the session to the new thread so that `ensure_engine_loaded`
// can restore the full message history from the session file.
if let Err(e) = state
.runtime_threads
.set_thread_session_id(&thread.id, &id)
.await
{
tracing::warn!("Failed to link session {id} to thread {}: {e}", thread.id);
}

let summary = format!(
"Resumed session '{}' ({} messages) into thread {}",
session.metadata.title, msg_count, thread.id
Expand Down Expand Up @@ -1014,6 +1032,19 @@ async fn create_session_from_thread(
.save_session(&session)
.map_err(|e| ApiError::internal(format!("Failed to save session: {e}")))?;

// Link the session to the thread so that `ensure_engine_loaded` can
// restore the full message history from the session file.
if let Err(e) = state
.runtime_threads
.set_thread_session_id(&detail.thread.id, &session_id)
.await
{
tracing::warn!(
"Failed to link session {session_id} to thread {}: {e}",
detail.thread.id
);
}

Ok((
StatusCode::CREATED,
Json(CreateSessionResponse {
Expand Down Expand Up @@ -1048,29 +1079,115 @@ fn messages_from_thread_detail(detail: &ThreadDetail) -> Vec<Message> {
let mut messages = Vec::new();

for turn in &detail.turns {
// Collect content blocks for the current assistant message.
// Multiple items (AgentMessage, AgentReasoning, ToolCall) may
// belong to the same assistant message, so we batch them.
let mut assistant_blocks: Vec<ContentBlock> = Vec::new();
let flush_assistant = |blocks: &mut Vec<ContentBlock>, msgs: &mut Vec<Message>| {
if !blocks.is_empty() {
msgs.push(Message {
role: "assistant".to_string(),
content: std::mem::take(blocks),
});
}
};

for item_id in &turn.item_ids {
let Some(item) = items_by_id.get(item_id.as_str()) else {
continue;
};
let role = match item.kind {
TurnItemKind::UserMessage => "user",
TurnItemKind::AgentMessage => "assistant",
_ => continue,
};
let Some(text) = item.detail.as_deref().map(str::trim) else {
continue;
};
if text.is_empty() {
continue;
match item.kind {
TurnItemKind::UserMessage => {
// Flush any pending assistant blocks before starting a
// new user message.
flush_assistant(&mut assistant_blocks, &mut messages);

let text = item.detail.as_deref().map(str::trim).unwrap_or("");
if !text.is_empty() {
messages.push(Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: text.to_string(),
cache_control: None,
}],
});
}
}
TurnItemKind::AgentMessage => {
let text = item.detail.as_deref().map(str::trim).unwrap_or("");
if !text.is_empty() {
assistant_blocks.push(ContentBlock::Text {
text: text.to_string(),
cache_control: None,
});
}
}
TurnItemKind::AgentReasoning => {
let thinking = item.detail.as_deref().map(str::trim).unwrap_or("");
if !thinking.is_empty() {
assistant_blocks.push(ContentBlock::Thinking {
thinking: thinking.to_string(),
signature: None,
});
}
}
TurnItemKind::ToolCall => {
// Check metadata to distinguish tool_use from tool_result.
let meta = item.metadata.as_ref();
let is_tool_result = meta.and_then(|m| m.get("tool_result_for")).is_some();
if is_tool_result {
// tool_result blocks go in a user message.
// Flush any pending assistant blocks first.
flush_assistant(&mut assistant_blocks, &mut messages);

let tool_use_id = meta
.and_then(|m| m.get("tool_result_for"))
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let content = item.detail.as_deref().unwrap_or("").to_string();
let is_error = meta
.and_then(|m| m.get("is_error"))
.and_then(|v| v.as_bool())
.unwrap_or(false);
messages.push(Message {
role: "user".to_string(),
content: vec![ContentBlock::ToolResult {
tool_use_id,
content,
is_error: if is_error { Some(true) } else { None },
content_blocks: None,
}],
});
} else {
// tool_use block — part of assistant message.
let tool_use_id = meta
.and_then(|m| m.get("tool_use_id"))
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let tool_name = meta
.and_then(|m| m.get("tool_name"))
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let input_str = item.detail.as_deref().unwrap_or("{}");
let input: serde_json::Value =
serde_json::from_str(input_str).unwrap_or(serde_json::Value::Null);
assistant_blocks.push(ContentBlock::ToolUse {
id: tool_use_id,
name: tool_name,
input,
caller: None,
});
}
}
// Skip other item kinds (file_change, command_execution, etc.)
_ => {}
}
messages.push(Message {
role: role.to_string(),
content: vec![ContentBlock::Text {
text: text.to_string(),
cache_control: None,
}],
});
}
// Flush any remaining assistant blocks.
flush_assistant(&mut assistant_blocks, &mut messages);
}

messages
Comment thread
gaord marked this conversation as resolved.
Expand Down Expand Up @@ -1193,8 +1310,20 @@ async fn save_current_session(
.save_session(&session)
.map_err(|e| ApiError::internal(format!("Failed to save session: {e}")))?;

// Link the session to the thread so that `ensure_engine_loaded` can
// restore the full message history (including thinking/tool blocks)
// from the session file instead of reconstructing from turns.
let session_id = session.metadata.id.clone();
if let Err(e) = state
.runtime_threads
.set_thread_session_id(&thread_id, &session_id)
.await
{
tracing::warn!("Failed to link session {session_id} to thread {thread_id}: {e}");
}

Ok(Json(SaveSessionResponse {
session_id: session.metadata.id.clone(),
session_id,
session: session_to_detail(session),
}))
}
Expand Down Expand Up @@ -3597,7 +3726,8 @@ mod tests {
let runtime_threads: SharedRuntimeThreadManager = Arc::new(RuntimeThreadManager::open(
Config::default(),
workspace.clone(),
RuntimeThreadManagerConfig::from_task_data_dir(root.join("runtime")),
RuntimeThreadManagerConfig::from_task_data_dir(root.join("runtime"))
.with_sessions_dir(sessions_dir.clone()),
)?);
runtime_threads.attach_task_manager(manager.clone());
let automations = Arc::new(Mutex::new(AutomationManager::open(
Expand Down Expand Up @@ -5289,6 +5419,77 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn session_resume_thread_engine_load_preserves_thinking_from_custom_sessions_dir()
-> Result<()> {
let root = std::env::temp_dir().join(format!(
"deepseek-session-resume-thinking-{}",
Uuid::new_v4()
));
let sessions_dir = root.join("sessions");
fs::create_dir_all(&sessions_dir)?;
let session = json!({
"schema_version": 1,
"metadata": {
"id": "sess_test_resume_thinking",
"title": "Test resume session with thinking",
"created_at": "2025-01-01T00:00:00Z",
"updated_at": "2025-01-01T00:10:00Z",
"message_count": 2,
"total_tokens": 100,
"model": "deepseek-v4-pro",
"workspace": "/tmp/test",
"mode": "agent"
},
"messages": [
{
"role": "user",
"content": [{ "type": "text", "text": "Hello, world!" }]
},
{
"role": "assistant",
"content": [{ "type": "thinking", "thinking": "internal chain" }]
}
],
"system_prompt": null
});
fs::write(
sessions_dir.join("sess_test_resume_thinking.json"),
serde_json::to_string_pretty(&session)?,
)?;

let Some((addr, runtime_threads, handle)) =
spawn_test_server_with_root(root.clone(), sessions_dir.clone()).await?
else {
return Ok(());
};
let client = crate::tls::reqwest_client();

let resp = client
.post(format!(
"http://{addr}/v1/sessions/sess_test_resume_thinking/resume-thread"
))
.json(&json!({ "model": "deepseek-v4-pro" }))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::CREATED);
let resumed: serde_json::Value = resp.json().await?;
let thread_id = resumed["thread_id"]
.as_str()
.context("missing resumed thread id")?;

let engine = runtime_threads.get_engine(thread_id).await?;
let snapshot = engine.get_session_snapshot().await?;
assert_eq!(snapshot.messages.len(), 2);
assert!(matches!(
&snapshot.messages[1].content[0],
ContentBlock::Thinking { thinking, .. } if thinking == "internal chain"
));

handle.abort();
Ok(())
}

#[tokio::test]
async fn session_create_from_completed_thread_saves_messages() -> Result<()> {
let root = std::env::temp_dir().join(format!("deepseek-thread-session-{}", Uuid::new_v4()));
Expand Down
Loading
Loading