Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 139 additions & 18 deletions crates/tui/src/runtime_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,16 @@ async fn resume_session_thread(
.await
.map_err(|e| ApiError::internal(format!("Failed to seed thread history: {e}")))?;

// Link the session to the new thread so that `ensure_engine_loaded`
// can restore the full message history from the session file.
if let Err(e) = state
.runtime_threads
.set_thread_session_id(&thread.id, &id)
.await
{
tracing::warn!("Failed to link session {id} to thread {}: {e}", thread.id);
}

let summary = format!(
"Resumed session '{}' ({} messages) into thread {}",
session.metadata.title, msg_count, thread.id
Expand Down Expand Up @@ -1014,6 +1024,19 @@ async fn create_session_from_thread(
.save_session(&session)
.map_err(|e| ApiError::internal(format!("Failed to save session: {e}")))?;

// Link the session to the thread so that `ensure_engine_loaded` can
// restore the full message history from the session file.
if let Err(e) = state
.runtime_threads
.set_thread_session_id(&detail.thread.id, &session_id)
.await
{
tracing::warn!(
"Failed to link session {session_id} to thread {}: {e}",
detail.thread.id
);
}

Ok((
StatusCode::CREATED,
Json(CreateSessionResponse {
Expand Down Expand Up @@ -1048,29 +1071,115 @@ fn messages_from_thread_detail(detail: &ThreadDetail) -> Vec<Message> {
let mut messages = Vec::new();

for turn in &detail.turns {
// Collect content blocks for the current assistant message.
// Multiple items (AgentMessage, AgentReasoning, ToolCall) may
// belong to the same assistant message, so we batch them.
let mut assistant_blocks: Vec<ContentBlock> = Vec::new();
let flush_assistant = |blocks: &mut Vec<ContentBlock>, msgs: &mut Vec<Message>| {
if !blocks.is_empty() {
msgs.push(Message {
role: "assistant".to_string(),
content: std::mem::take(blocks),
});
}
};

for item_id in &turn.item_ids {
let Some(item) = items_by_id.get(item_id.as_str()) else {
continue;
};
let role = match item.kind {
TurnItemKind::UserMessage => "user",
TurnItemKind::AgentMessage => "assistant",
_ => continue,
};
let Some(text) = item.detail.as_deref().map(str::trim) else {
continue;
};
if text.is_empty() {
continue;
match item.kind {
TurnItemKind::UserMessage => {
// Flush any pending assistant blocks before starting a
// new user message.
flush_assistant(&mut assistant_blocks, &mut messages);

let text = item.detail.as_deref().map(str::trim).unwrap_or("");
if !text.is_empty() {
messages.push(Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: text.to_string(),
cache_control: None,
}],
});
}
}
TurnItemKind::AgentMessage => {
let text = item.detail.as_deref().map(str::trim).unwrap_or("");
if !text.is_empty() {
assistant_blocks.push(ContentBlock::Text {
text: text.to_string(),
cache_control: None,
});
}
}
TurnItemKind::AgentReasoning => {
let thinking = item.detail.as_deref().map(str::trim).unwrap_or("");
if !thinking.is_empty() {
assistant_blocks.push(ContentBlock::Thinking {
thinking: thinking.to_string(),
signature: None,
});
}
}
TurnItemKind::ToolCall => {
// Check metadata to distinguish tool_use from tool_result.
let meta = item.metadata.as_ref();
let is_tool_result = meta.and_then(|m| m.get("tool_result_for")).is_some();
if is_tool_result {
// tool_result blocks go in a user message.
// Flush any pending assistant blocks first.
flush_assistant(&mut assistant_blocks, &mut messages);

let tool_use_id = meta
.and_then(|m| m.get("tool_result_for"))
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let content = item.detail.as_deref().unwrap_or("").to_string();
let is_error = meta
.and_then(|m| m.get("is_error"))
.and_then(|v| v.as_bool())
.unwrap_or(false);
messages.push(Message {
role: "user".to_string(),
content: vec![ContentBlock::ToolResult {
tool_use_id,
content,
is_error: if is_error { Some(true) } else { None },
content_blocks: None,
}],
});
} else {
// tool_use block — part of assistant message.
let tool_use_id = meta
.and_then(|m| m.get("tool_use_id"))
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let tool_name = meta
.and_then(|m| m.get("tool_name"))
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let input_str = item.detail.as_deref().unwrap_or("{}");
let input: serde_json::Value =
serde_json::from_str(input_str).unwrap_or(serde_json::Value::Null);
assistant_blocks.push(ContentBlock::ToolUse {
id: tool_use_id,
name: tool_name,
input,
caller: None,
});
}
}
// Skip other item kinds (file_change, command_execution, etc.)
_ => {}
}
messages.push(Message {
role: role.to_string(),
content: vec![ContentBlock::Text {
text: text.to_string(),
cache_control: None,
}],
});
}
// Flush any remaining assistant blocks.
flush_assistant(&mut assistant_blocks, &mut messages);
}
Comment on lines 1073 to 1183

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In messages_from_thread_detail, consecutive tool_result blocks (and UserMessage blocks) are not batched into a single user message. This results in consecutive user messages being saved to the session file, which violates LLM provider contracts (e.g., Anthropic, OpenAI) that forbid back-to-back user messages. Batching them into user_blocks and flushing them at turn boundaries (similar to reconstruct_messages_from_turns) ensures API contract compliance and consistency.

    for turn in &detail.turns {
        let mut user_blocks: Vec<ContentBlock> = Vec::new();
        // Collect content blocks for the current assistant message.
        // Multiple items (AgentMessage, AgentReasoning, ToolCall) may
        // belong to the same assistant message, so we batch them.
        let mut assistant_blocks: Vec<ContentBlock> = Vec::new();
        let flush_user = |blocks: &mut Vec<ContentBlock>, msgs: &mut Vec<Message>| {
            if !blocks.is_empty() {
                msgs.push(Message {
                    role: "user".to_string(),
                    content: std::mem::take(blocks),
                });
            }
        };
        let flush_assistant = |blocks: &mut Vec<ContentBlock>, msgs: &mut Vec<Message>| {
            if !blocks.is_empty() {
                msgs.push(Message {
                    role: "assistant".to_string(),
                    content: std::mem::take(blocks),
                });
            }
        };

        for item_id in &turn.item_ids {
            let Some(item) = items_by_id.get(item_id.as_str()) else {
                continue;
            };
            match item.kind {
                TurnItemKind::UserMessage => {
                    // Flush any pending assistant blocks before starting a
                    // new user message.
                    flush_assistant(&mut assistant_blocks, &mut messages);

                    let text = item.detail.as_deref().map(str::trim).unwrap_or("");
                    if !text.is_empty() {
                        user_blocks.push(ContentBlock::Text {
                            text: text.to_string(),
                            cache_control: None,
                        });
                    }
                }
                TurnItemKind::AgentMessage => {
                    flush_user(&mut user_blocks, &mut messages);

                    let text = item.detail.as_deref().map(str::trim).unwrap_or("");
                    if !text.is_empty() {
                        assistant_blocks.push(ContentBlock::Text {
                            text: text.to_string(),
                            cache_control: None,
                        });
                    }
                }
                TurnItemKind::AgentReasoning => {
                    flush_user(&mut user_blocks, &mut messages);

                    let thinking = item.detail.as_deref().map(str::trim).unwrap_or("");
                    if !thinking.is_empty() {
                        assistant_blocks.push(ContentBlock::Thinking {
                            thinking: thinking.to_string(),
                            signature: None,
                        });
                    }
                }
                TurnItemKind::ToolCall => {
                    // Check metadata to distinguish tool_use from tool_result.
                    let meta = item.metadata.as_ref();
                    let is_tool_result = meta.and_then(|m| m.get("tool_result_for")).is_some();
                    if is_tool_result {
                        // tool_result blocks go in a user message.
                        // Flush any pending assistant blocks first.
                        flush_assistant(&mut assistant_blocks, &mut messages);

                        let tool_use_id = meta
                            .and_then(|m| m.get("tool_result_for"))
                            .and_then(|v| v.as_str())
                            .unwrap_or("")
                            .to_string();
                        let content = item.detail.as_deref().unwrap_or("").to_string();
                        let is_error = meta
                            .and_then(|m| m.get("is_error"))
                            .and_then(|v| v.as_bool())
                            .unwrap_or(false);
                        user_blocks.push(ContentBlock::ToolResult {
                            tool_use_id,
                            content,
                            is_error: if is_error { Some(true) } else { None },
                            content_blocks: None,
                        });
                    } else {
                        flush_user(&mut user_blocks, &mut messages);

                        // tool_use block — part of assistant message.
                        let tool_use_id = meta
                            .and_then(|m| m.get("tool_use_id"))
                            .and_then(|v| v.as_str())
                            .unwrap_or("")
                            .to_string();
                        let tool_name = meta
                            .and_then(|m| m.get("tool_name"))
                            .and_then(|v| v.as_str())
                            .unwrap_or("")
                            .to_string();
                        let input_str = item.detail.as_deref().unwrap_or("{}");
                        let input: serde_json::Value =
                            serde_json::from_str(input_str).unwrap_or(serde_json::Value::Null);
                        assistant_blocks.push(ContentBlock::ToolUse {
                            id: tool_use_id,
                            name: tool_name,
                            input,
                            caller: None,
                        });
                    }
                }
                // Skip other item kinds (file_change, command_execution, etc.)
                _ => {}
            }
        }
        // Flush any remaining assistant blocks.
        flush_assistant(&mut assistant_blocks, &mut messages);
        flush_user(&mut user_blocks, &mut messages);
    }


messages
Expand Down Expand Up @@ -1193,8 +1302,20 @@ async fn save_current_session(
.save_session(&session)
.map_err(|e| ApiError::internal(format!("Failed to save session: {e}")))?;

// Link the session to the thread so that `ensure_engine_loaded` can
// restore the full message history (including thinking/tool blocks)
// from the session file instead of reconstructing from turns.
let session_id = session.metadata.id.clone();
if let Err(e) = state
.runtime_threads
.set_thread_session_id(&thread_id, &session_id)
.await
{
tracing::warn!("Failed to link session {session_id} to thread {thread_id}: {e}");
}

Ok(Json(SaveSessionResponse {
session_id: session.metadata.id.clone(),
session_id,
session: session_to_detail(session),
}))
}
Expand Down
Loading
Loading