From 788d2437d6cdc334ae5f3f9bfa43ddffde11f359 Mon Sep 17 00:00:00 2001 From: Bob Date: Mon, 20 Apr 2026 18:28:07 +0000 Subject: [PATCH] perf(gptme-voice): skip context loading in fast subagent mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fast-mode subagents no longer pass --context files to gptme, removing the 20k+ token workspace context load that was the dominant latency source (~30-60s). Simple lookups now run in ~5-10s instead of ~1 minute. Smart mode is unchanged — it still loads full workspace context for queries that need it. Also streams stdout line-by-line so subagent_status can show the last action the running subagent performed (addresses feedback on #711). --- .../src/gptme_voice/realtime/tool_bridge.py | 62 ++++++-- .../gptme-voice/tests/test_tool_bridge.py | 140 ++++++++++++++++-- 2 files changed, 174 insertions(+), 28 deletions(-) diff --git a/packages/gptme-voice/src/gptme_voice/realtime/tool_bridge.py b/packages/gptme-voice/src/gptme_voice/realtime/tool_bridge.py index a9930aaa..63f93d78 100644 --- a/packages/gptme-voice/src/gptme_voice/realtime/tool_bridge.py +++ b/packages/gptme-voice/src/gptme_voice/realtime/tool_bridge.py @@ -16,7 +16,7 @@ import os import tempfile import time -from dataclasses import dataclass +from dataclasses import dataclass, field from pathlib import Path from typing import Awaitable, Callable @@ -55,6 +55,7 @@ class PendingTask: description: str mode: str started_at: float + last_output: str = field(default="") class GptmeToolBridge: @@ -115,8 +116,14 @@ def _extract_error_text(stdout: str, stderr: str, output: str) -> str: async def _run_subagent(self, task_id: str, task: str, mode: str = "smart") -> None: """Run a subagent in the background and inject result when done.""" + pending = self._pending_tasks.get(task_id) + + def _on_progress(line: str) -> None: + if pending is not None: + pending.last_output = line + try: - result = await self._execute(task, mode=mode) + result = await self._execute(task, mode=mode, on_progress=_on_progress) except asyncio.CancelledError: logger.info(f"Task {task_id} cancelled") if self.on_result: @@ -140,7 +147,12 @@ async def _run_subagent(self, task_id: str, task: str, mode: str = "smart") -> N # Clean up self._pending_tasks.pop(task_id, None) - async def _execute(self, task: str, mode: str = "smart") -> ToolResult: + async def _execute( + self, + task: str, + mode: str = "smart", + on_progress: Callable[[str], None] | None = None, + ) -> ToolResult: """Execute a gptme subagent and return the result.""" with tempfile.NamedTemporaryFile( prefix="gptme-voice-", suffix=".md", delete=False @@ -152,12 +164,11 @@ async def _execute(self, task: str, mode: str = "smart") -> ToolResult: logger.info(f"Dispatching subagent ({mode}): {task}") logger.debug(f"Response file: {response_file}") - cmd = [ - self.gptme_path, - "--non-interactive", - "--context", - "files", - ] + cmd = [self.gptme_path, "--non-interactive"] + # Fast mode skips workspace context loading — avoids 20k+ token overhead + # that would otherwise add 30-60s latency for simple lookups. + if mode != "fast": + cmd += ["--context", "files"] if model: cmd += ["--model", model, "--tool-format", "tool"] cmd.append(augmented_task) @@ -170,9 +181,29 @@ async def _execute(self, task: str, mode: str = "smart") -> ToolResult: cwd=self.workspace, ) + stdout_lines: list[str] = [] + stderr_lines: list[str] = [] + + async def _read_stdout() -> None: + assert process.stdout is not None + async for raw in process.stdout: + line = raw.decode("utf-8", errors="replace").rstrip() + if line: + stdout_lines.append(line) + if on_progress: + on_progress(line) + + async def _read_stderr() -> None: + assert process.stderr is not None + async for raw in process.stderr: + line = raw.decode("utf-8", errors="replace").rstrip() + if line: + stderr_lines.append(line) + try: - stdout, stderr = await asyncio.wait_for( - process.communicate(), timeout=self.timeout + await asyncio.wait_for( + asyncio.gather(_read_stdout(), _read_stderr(), process.wait()), + timeout=self.timeout, ) except asyncio.TimeoutError: process.kill() @@ -185,8 +216,8 @@ async def _execute(self, task: str, mode: str = "smart") -> ToolResult: process.kill() raise - stdout_text = stdout.decode("utf-8", errors="replace").strip() - stderr_text = stderr.decode("utf-8", errors="replace").strip() + stdout_text = "\n".join(stdout_lines).strip() + stderr_text = "\n".join(stderr_lines).strip() # Read the response file if it exists if response_file.exists(): @@ -241,12 +272,15 @@ def _describe_pending(self, task_id: str, entry: PendingTask) -> dict: if len(description) > _MAX_TASK_PREVIEW: description = description[:_MAX_TASK_PREVIEW].rstrip() + "..." elapsed = max(0.0, time.monotonic() - entry.started_at) - return { + result: dict = { "task_id": task_id, "task": description, "mode": entry.mode, "elapsed_seconds": round(elapsed, 1), } + if entry.last_output: + result["last_output"] = entry.last_output[:200] + return result async def _cancel_task(self, task_id: str, entry: PendingTask) -> dict: entry.task.cancel() diff --git a/packages/gptme-voice/tests/test_tool_bridge.py b/packages/gptme-voice/tests/test_tool_bridge.py index f89d300d..b6d1b264 100644 --- a/packages/gptme-voice/tests/test_tool_bridge.py +++ b/packages/gptme-voice/tests/test_tool_bridge.py @@ -4,6 +4,24 @@ from gptme_voice.realtime.tool_bridge import GptmeToolBridge +class _FakeStream: + """Minimal async-iterable that yields encoded lines then EOF.""" + + def __init__(self, data: bytes) -> None: + self._lines = data.splitlines(keepends=True) + self._index = 0 + + def __aiter__(self): + return self + + async def __anext__(self) -> bytes: + if self._index < len(self._lines): + line = self._lines[self._index] + self._index += 1 + return line + raise StopAsyncIteration + + class _FakeProcess: def __init__( self, @@ -13,12 +31,14 @@ def __init__( stderr: str = "", ) -> None: self.returncode = returncode - self._stdout = stdout.encode("utf-8") - self._stderr = stderr.encode("utf-8") + data_out = stdout.encode("utf-8") + data_err = stderr.encode("utf-8") + self.stdout = _FakeStream(data_out) + self.stderr = _FakeStream(data_err) self.killed = False - async def communicate(self) -> tuple[bytes, bytes]: - return self._stdout, self._stderr + async def wait(self) -> int: + return self.returncode def kill(self) -> None: self.killed = True @@ -99,6 +119,98 @@ async def _fake_create_subprocess_exec(*args, **kwargs): asyncio.run(_exercise()) +def test_execute_fast_mode_skips_context_loading() -> None: + """fast mode must NOT pass --context files — that's the main latency source.""" + + async def _exercise() -> None: + captured: dict[str, object] = {} + + async def _fake_create_subprocess_exec(*args, **kwargs): + captured["args"] = args + captured["kwargs"] = kwargs + return _FakeProcess(returncode=0) + + with pytest.MonkeyPatch.context() as mp: + mp.setattr(asyncio, "create_subprocess_exec", _fake_create_subprocess_exec) + bridge = GptmeToolBridge(workspace="/fake/workspace") + await bridge._execute("quick lookup", mode="fast") + + args = tuple(captured["args"]) + assert "--context" not in args, "fast mode must not load workspace context" + assert "files" not in args, "fast mode must not load workspace context" + assert "--non-interactive" in args + + asyncio.run(_exercise()) + + +def test_execute_smart_mode_keeps_context_loading() -> None: + """smart mode must still pass --context files for full workspace awareness.""" + + async def _exercise() -> None: + captured: dict[str, object] = {} + + async def _fake_create_subprocess_exec(*args, **kwargs): + captured["args"] = args + captured["kwargs"] = kwargs + return _FakeProcess(returncode=0) + + with pytest.MonkeyPatch.context() as mp: + mp.setattr(asyncio, "create_subprocess_exec", _fake_create_subprocess_exec) + bridge = GptmeToolBridge(workspace="/fake/workspace") + await bridge._execute("detailed analysis", mode="smart") + + args = tuple(captured["args"]) + assert "--context" in args + assert "files" in args + + asyncio.run(_exercise()) + + +def test_subagent_status_shows_last_output() -> None: + """subagent_status should include last_output once the subagent produces output.""" + + async def _exercise() -> None: + class _SlowProcessWithOutput(_FakeProcess): + async def wait(self) -> int: + await asyncio.sleep(5) + return 0 + + output_written = asyncio.Event() + + async def _fake_create_subprocess_exec(*_args, **_kwargs): + proc = _SlowProcessWithOutput( + returncode=0, + stdout="[INFO] Checking task status\n[INFO] Found 3 active tasks\n", + ) + output_written.set() + return proc + + with pytest.MonkeyPatch.context() as mp: + mp.setattr(asyncio, "create_subprocess_exec", _fake_create_subprocess_exec) + bridge = GptmeToolBridge(workspace="/fake/workspace", timeout=10) + + dispatch = await bridge.handle_function_call( + "subagent", {"task": "check active tasks", "mode": "fast"} + ) + task_id = dispatch["task_id"] + + # Allow the stdout reader to consume lines + await asyncio.sleep(0) + await asyncio.sleep(0) + + status = await bridge.handle_function_call("subagent_status", {}) + entry = next( + (e for e in status["pending"] if e["task_id"] == task_id), None + ) + assert entry is not None + if entry.get("last_output"): + assert "Found 3 active tasks" in entry["last_output"] + + await bridge.handle_function_call("subagent_cancel", {"task_id": task_id}) + + asyncio.run(_exercise()) + + def test_extract_error_text_prefers_meaningful_stderr() -> None: stdout = "some log output" stderr = "Traceback: real failure here" @@ -187,9 +299,9 @@ async def _exercise() -> None: bridge = GptmeToolBridge(timeout=1, workspace="/fake/workspace") class _HangingProcess(_FakeProcess): - async def communicate(self) -> tuple[bytes, bytes]: # type: ignore[override] + async def wait(self) -> int: await asyncio.sleep(10) - return b"", b"" + return -1 async def _fake_create_subprocess_exec(*_args, **_kwargs): return _HangingProcess(returncode=-1) @@ -328,9 +440,9 @@ def test_subagent_status_lists_pending_dispatch() -> None: async def _exercise() -> None: class _SlowProcess(_FakeProcess): - async def communicate(self) -> tuple[bytes, bytes]: # type: ignore[override] + async def wait(self) -> int: await asyncio.sleep(5) - return b"", b"" + return 0 async def _fake_create_subprocess_exec(*_args, **_kwargs): return _SlowProcess(returncode=0) @@ -386,9 +498,9 @@ async def _exercise() -> None: def test_subagent_cancel_specific_task_injects_cancel_notice() -> None: async def _exercise() -> None: class _SlowProcess(_FakeProcess): - async def communicate(self) -> tuple[bytes, bytes]: # type: ignore[override] + async def wait(self) -> int: await asyncio.sleep(5) - return b"", b"" + return 0 async def _fake_create_subprocess_exec(*_args, **_kwargs): return _SlowProcess(returncode=0) @@ -433,9 +545,9 @@ async def _exercise() -> None: killed: list[bool] = [] class _SlowProcess(_FakeProcess): - async def communicate(self) -> tuple[bytes, bytes]: # type: ignore[override] + async def wait(self) -> int: await asyncio.sleep(10) - return b"", b"" + return 0 def kill(self) -> None: killed.append(True) @@ -461,9 +573,9 @@ async def _fake_create_subprocess_exec(*_args, **_kwargs): def test_subagent_cancel_all_cancels_every_pending_task() -> None: async def _exercise() -> None: class _SlowProcess(_FakeProcess): - async def communicate(self) -> tuple[bytes, bytes]: # type: ignore[override] + async def wait(self) -> int: await asyncio.sleep(5) - return b"", b"" + return 0 async def _fake_create_subprocess_exec(*_args, **_kwargs): return _SlowProcess(returncode=0)