Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 48 additions & 14 deletions packages/gptme-voice/src/gptme_voice/realtime/tool_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import os
import tempfile
import time
from dataclasses import dataclass
from dataclasses import dataclass, field
from pathlib import Path
from typing import Awaitable, Callable

Expand Down Expand Up @@ -55,6 +55,7 @@ class PendingTask:
description: str
mode: str
started_at: float
last_output: str = field(default="")


class GptmeToolBridge:
Expand Down Expand Up @@ -115,8 +116,14 @@ def _extract_error_text(stdout: str, stderr: str, output: str) -> str:

async def _run_subagent(self, task_id: str, task: str, mode: str = "smart") -> None:
"""Run a subagent in the background and inject result when done."""
pending = self._pending_tasks.get(task_id)

def _on_progress(line: str) -> None:
if pending is not None:
pending.last_output = line

try:
result = await self._execute(task, mode=mode)
result = await self._execute(task, mode=mode, on_progress=_on_progress)
except asyncio.CancelledError:
logger.info(f"Task {task_id} cancelled")
if self.on_result:
Expand All @@ -140,7 +147,12 @@ async def _run_subagent(self, task_id: str, task: str, mode: str = "smart") -> N
# Clean up
self._pending_tasks.pop(task_id, None)

async def _execute(self, task: str, mode: str = "smart") -> ToolResult:
async def _execute(
self,
task: str,
mode: str = "smart",
on_progress: Callable[[str], None] | None = None,
) -> ToolResult:
"""Execute a gptme subagent and return the result."""
with tempfile.NamedTemporaryFile(
prefix="gptme-voice-", suffix=".md", delete=False
Expand All @@ -152,12 +164,11 @@ async def _execute(self, task: str, mode: str = "smart") -> ToolResult:
logger.info(f"Dispatching subagent ({mode}): {task}")
logger.debug(f"Response file: {response_file}")

cmd = [
self.gptme_path,
"--non-interactive",
"--context",
"files",
]
cmd = [self.gptme_path, "--non-interactive"]
# Fast mode skips workspace context loading — avoids 20k+ token overhead
# that would otherwise add 30-60s latency for simple lookups.
if mode != "fast":
cmd += ["--context", "files"]
if model:
cmd += ["--model", model, "--tool-format", "tool"]
cmd.append(augmented_task)
Expand All @@ -170,9 +181,29 @@ async def _execute(self, task: str, mode: str = "smart") -> ToolResult:
cwd=self.workspace,
)

stdout_lines: list[str] = []
stderr_lines: list[str] = []

async def _read_stdout() -> None:
assert process.stdout is not None
async for raw in process.stdout:
line = raw.decode("utf-8", errors="replace").rstrip()
if line:
stdout_lines.append(line)
if on_progress:
on_progress(line)

async def _read_stderr() -> None:
assert process.stderr is not None
async for raw in process.stderr:
line = raw.decode("utf-8", errors="replace").rstrip()
if line:
stderr_lines.append(line)

try:
stdout, stderr = await asyncio.wait_for(
process.communicate(), timeout=self.timeout
await asyncio.wait_for(
asyncio.gather(_read_stdout(), _read_stderr(), process.wait()),
timeout=self.timeout,
)
except asyncio.TimeoutError:
process.kill()
Expand All @@ -185,8 +216,8 @@ async def _execute(self, task: str, mode: str = "smart") -> ToolResult:
process.kill()
raise

stdout_text = stdout.decode("utf-8", errors="replace").strip()
stderr_text = stderr.decode("utf-8", errors="replace").strip()
stdout_text = "\n".join(stdout_lines).strip()
stderr_text = "\n".join(stderr_lines).strip()

# Read the response file if it exists
if response_file.exists():
Expand Down Expand Up @@ -241,12 +272,15 @@ def _describe_pending(self, task_id: str, entry: PendingTask) -> dict:
if len(description) > _MAX_TASK_PREVIEW:
description = description[:_MAX_TASK_PREVIEW].rstrip() + "..."
elapsed = max(0.0, time.monotonic() - entry.started_at)
return {
result: dict = {
"task_id": task_id,
"task": description,
"mode": entry.mode,
"elapsed_seconds": round(elapsed, 1),
}
if entry.last_output:
result["last_output"] = entry.last_output[:200]
return result

async def _cancel_task(self, task_id: str, entry: PendingTask) -> dict:
entry.task.cancel()
Expand Down
140 changes: 126 additions & 14 deletions packages/gptme-voice/tests/test_tool_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,24 @@
from gptme_voice.realtime.tool_bridge import GptmeToolBridge


class _FakeStream:
"""Minimal async-iterable that yields encoded lines then EOF."""

def __init__(self, data: bytes) -> None:
self._lines = data.splitlines(keepends=True)
self._index = 0

def __aiter__(self):
return self

async def __anext__(self) -> bytes:
if self._index < len(self._lines):
line = self._lines[self._index]
self._index += 1
return line
raise StopAsyncIteration


class _FakeProcess:
def __init__(
self,
Expand All @@ -13,12 +31,14 @@ def __init__(
stderr: str = "",
) -> None:
self.returncode = returncode
self._stdout = stdout.encode("utf-8")
self._stderr = stderr.encode("utf-8")
data_out = stdout.encode("utf-8")
data_err = stderr.encode("utf-8")
self.stdout = _FakeStream(data_out)
self.stderr = _FakeStream(data_err)
self.killed = False

async def communicate(self) -> tuple[bytes, bytes]:
return self._stdout, self._stderr
async def wait(self) -> int:
return self.returncode

def kill(self) -> None:
self.killed = True
Expand Down Expand Up @@ -99,6 +119,98 @@ async def _fake_create_subprocess_exec(*args, **kwargs):
asyncio.run(_exercise())


def test_execute_fast_mode_skips_context_loading() -> None:
"""fast mode must NOT pass --context files — that's the main latency source."""

async def _exercise() -> None:
captured: dict[str, object] = {}

async def _fake_create_subprocess_exec(*args, **kwargs):
captured["args"] = args
captured["kwargs"] = kwargs
return _FakeProcess(returncode=0)

with pytest.MonkeyPatch.context() as mp:
mp.setattr(asyncio, "create_subprocess_exec", _fake_create_subprocess_exec)
bridge = GptmeToolBridge(workspace="/fake/workspace")
await bridge._execute("quick lookup", mode="fast")

args = tuple(captured["args"])
assert "--context" not in args, "fast mode must not load workspace context"
assert "files" not in args, "fast mode must not load workspace context"
assert "--non-interactive" in args

asyncio.run(_exercise())


def test_execute_smart_mode_keeps_context_loading() -> None:
"""smart mode must still pass --context files for full workspace awareness."""

async def _exercise() -> None:
captured: dict[str, object] = {}

async def _fake_create_subprocess_exec(*args, **kwargs):
captured["args"] = args
captured["kwargs"] = kwargs
return _FakeProcess(returncode=0)

with pytest.MonkeyPatch.context() as mp:
mp.setattr(asyncio, "create_subprocess_exec", _fake_create_subprocess_exec)
bridge = GptmeToolBridge(workspace="/fake/workspace")
await bridge._execute("detailed analysis", mode="smart")

args = tuple(captured["args"])
assert "--context" in args
assert "files" in args

asyncio.run(_exercise())


def test_subagent_status_shows_last_output() -> None:
"""subagent_status should include last_output once the subagent produces output."""

async def _exercise() -> None:
class _SlowProcessWithOutput(_FakeProcess):
async def wait(self) -> int:
await asyncio.sleep(5)
return 0

output_written = asyncio.Event()

async def _fake_create_subprocess_exec(*_args, **_kwargs):
proc = _SlowProcessWithOutput(
returncode=0,
stdout="[INFO] Checking task status\n[INFO] Found 3 active tasks\n",
)
output_written.set()
return proc

with pytest.MonkeyPatch.context() as mp:
mp.setattr(asyncio, "create_subprocess_exec", _fake_create_subprocess_exec)
bridge = GptmeToolBridge(workspace="/fake/workspace", timeout=10)

dispatch = await bridge.handle_function_call(
"subagent", {"task": "check active tasks", "mode": "fast"}
)
task_id = dispatch["task_id"]

# Allow the stdout reader to consume lines
await asyncio.sleep(0)
await asyncio.sleep(0)

status = await bridge.handle_function_call("subagent_status", {})
entry = next(
(e for e in status["pending"] if e["task_id"] == task_id), None
)
assert entry is not None
if entry.get("last_output"):
assert "Found 3 active tasks" in entry["last_output"]
Comment on lines +206 to +207
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Weak assertion makes the test self-defeating

The central claim of this test is that last_output is populated once the subagent produces stdout lines, but the assertion is guarded by if entry.get("last_output"):. If last_output is always absent or empty — the exact regression being tested against — the assert on line 207 is simply never reached and the test passes silently.

Two asyncio.sleep(0) yields may not be enough for the async stdout reader to have fully iterated; if the reader hasn't run yet, last_output is "" and the branch is skipped entirely. Consider promoting this to an unconditional assert:

Suggested change
if entry.get("last_output"):
assert "Found 3 active tasks" in entry["last_output"]
assert entry.get("last_output"), "last_output must be set once subagent produces stdout"
assert "Found 3 active tasks" in entry["last_output"]


await bridge.handle_function_call("subagent_cancel", {"task_id": task_id})

asyncio.run(_exercise())


def test_extract_error_text_prefers_meaningful_stderr() -> None:
stdout = "some log output"
stderr = "Traceback: real failure here"
Expand Down Expand Up @@ -187,9 +299,9 @@ async def _exercise() -> None:
bridge = GptmeToolBridge(timeout=1, workspace="/fake/workspace")

class _HangingProcess(_FakeProcess):
async def communicate(self) -> tuple[bytes, bytes]: # type: ignore[override]
async def wait(self) -> int:
await asyncio.sleep(10)
return b"", b""
return -1

async def _fake_create_subprocess_exec(*_args, **_kwargs):
return _HangingProcess(returncode=-1)
Expand Down Expand Up @@ -328,9 +440,9 @@ def test_subagent_status_lists_pending_dispatch() -> None:

async def _exercise() -> None:
class _SlowProcess(_FakeProcess):
async def communicate(self) -> tuple[bytes, bytes]: # type: ignore[override]
async def wait(self) -> int:
await asyncio.sleep(5)
return b"", b""
return 0

async def _fake_create_subprocess_exec(*_args, **_kwargs):
return _SlowProcess(returncode=0)
Expand Down Expand Up @@ -386,9 +498,9 @@ async def _exercise() -> None:
def test_subagent_cancel_specific_task_injects_cancel_notice() -> None:
async def _exercise() -> None:
class _SlowProcess(_FakeProcess):
async def communicate(self) -> tuple[bytes, bytes]: # type: ignore[override]
async def wait(self) -> int:
await asyncio.sleep(5)
return b"", b""
return 0

async def _fake_create_subprocess_exec(*_args, **_kwargs):
return _SlowProcess(returncode=0)
Expand Down Expand Up @@ -433,9 +545,9 @@ async def _exercise() -> None:
killed: list[bool] = []

class _SlowProcess(_FakeProcess):
async def communicate(self) -> tuple[bytes, bytes]: # type: ignore[override]
async def wait(self) -> int:
await asyncio.sleep(10)
return b"", b""
return 0

def kill(self) -> None:
killed.append(True)
Expand All @@ -461,9 +573,9 @@ async def _fake_create_subprocess_exec(*_args, **_kwargs):
def test_subagent_cancel_all_cancels_every_pending_task() -> None:
async def _exercise() -> None:
class _SlowProcess(_FakeProcess):
async def communicate(self) -> tuple[bytes, bytes]: # type: ignore[override]
async def wait(self) -> int:
await asyncio.sleep(5)
return b"", b""
return 0

async def _fake_create_subprocess_exec(*_args, **_kwargs):
return _SlowProcess(returncode=0)
Expand Down
Loading