From 14637cd79fd90365632691557bf6eb7cfb20cb64 Mon Sep 17 00:00:00 2001 From: doudouOUC Date: Wed, 20 May 2026 20:29:37 +0800 Subject: [PATCH 1/6] feat(serve): stamp serverTimestamp / tool provenance / errorKind on daemon events (#4175 F4 prereq) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adopts chiga0's three P0 SDK-side blockers from #4175 comment #19 — the SDK side already consumes these fields (PR #4353), but daemon hadn't stamped them yet, leaving the corresponding UI affordances inert. All three stampings are purely additive on the wire and don't require any SDK type changes (SDK already has forward-compat field slots). **#19.1 — `_meta.serverTimestamp` on every SSE frame** (`server.ts` `formatSseFrame()`) Stamped at the SSE write boundary (NOT EventBus.publish) so the in-memory `BridgeEvent` type stays unchanged and internal consumers don't see `_meta`. Pre-existing `_meta` keys (e.g. tool_call's `_meta.toolName`) are preserved via spread merge. SDK reads via the 3-location probe in `extractServerTimestamp` (chiga0's PR #4353); we pick `_meta.serverTimestamp` (Anthropic convention) so top-level event type stays unpolluted. Why this matters: pre-fix, multi-client UIs showing "X minutes ago" or sorting transcript blocks by emit time used each client's local clock — drifts of tens of seconds to minutes across browsers/tabs/ mobile produced visibly inconsistent timestamps. **#19.2 — `tool_call` `provenance` + `serverId` on every emitter event** (`ToolCallEmitter.ts`) New static `ToolCallEmitter.resolveToolProvenance(toolName, subagentMeta)` returns `{ provenance: 'builtin' | 'mcp' | 'subagent'; serverId? }`. Resolution rules (per user-confirmed design decision from issue comment): subagent takes precedence (set when subagentMeta is present); `mcp____` naming heuristic classifies MCP tools with serverId; everything else is builtin. Stamped on `emitStart` AND `emitResult` AND `emitError` (all three emit paths) so a reconnecting client receiving a `tool_call_update` frame from the replay ring (without the original `tool_call` start event) can still derive the provenance. Provenance is stable per tool, so stamping on every event is redundant — but the marginal serialization cost is tiny and reconnect correctness wins. Chose the naming heuristic (not ToolRegistry lookup) per user confirmation: matches the SDK's own fallback (chiga0 PR #4353), no new ctx-dep on emit hot path, no signature changes. **#19.3 — `errorKind` on `stream_error`** (`server.ts` line ~1955) Stamped via `mapDomainErrorToErrorKind(err)` — the 7-value classifier already lives in `@qwen-code/acp-bridge/status.ts` since #4319. When the classifier returns `undefined` (generic Error etc.) the field is omitted — strictly additive. SDK consumers handle "errorKind absent" as before (fall back to rendering `error` text). NOT stamped on `session_died` because the 3 emit sites in `acp-bridge/ bridge.ts` don't have a classifiable `err` in scope: - `channel_closed` carries only exitCode/signalCode (no error) - `killed` is user-initiated (no domain error) - `daemon_shutdown` is operator-initiated (no domain error) A follow-up could thread channel-spawn errors through to the session_died emit site to enable `errorKind: 'init_timeout'` / `missing_binary` classification — left for a separate PR to avoid mixing protocol stamping with lifecycle plumbing. Verification - `npx vitest run packages/cli/src/serve/server.test.ts -t "serverTimestamp|stream_error|errorKind"` — 5 pass - `npx vitest run packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.test.ts` — 46 pass (+ 11 new tests for resolveToolProvenance + provenance stamping on all 3 emit paths) - `npx vitest run packages/cli/src/acp-integration/session/HistoryReplayer.test.ts` — 17 pass - TypeScript clean on touched regions; pre-existing F3 (#4335 merge) errors elsewhere are unrelated. Existing test updates - 15 `_meta: { toolName: 'X' }` assertions in ToolCallEmitter.test.ts updated to include `provenance: 'builtin'` (defensive — catches accidental drift if a future refactor stops stamping). 2 strict-equality assertions in HistoryReplayer.test.ts similarly updated. The first SSE-frame test in server.test.ts switched from `toEqual` to `toMatchObject` since `_meta.serverTimestamp` makes exact equality brittle; a dedicated test pins the new field's shape. --- .../session/HistoryReplayer.test.ts | 7 + .../session/emitters/ToolCallEmitter.test.ts | 148 ++++++++++++++++-- .../session/emitters/ToolCallEmitter.ts | 59 +++++++ packages/cli/src/serve/server.test.ts | 108 ++++++++++++- packages/cli/src/serve/server.ts | 35 ++++- 5 files changed, 337 insertions(+), 20 deletions(-) diff --git a/packages/cli/src/acp-integration/session/HistoryReplayer.test.ts b/packages/cli/src/acp-integration/session/HistoryReplayer.test.ts index 0188fdaa41..cf15142437 100644 --- a/packages/cli/src/acp-integration/session/HistoryReplayer.test.ts +++ b/packages/cli/src/acp-integration/session/HistoryReplayer.test.ts @@ -259,6 +259,11 @@ describe('HistoryReplayer', () => { rawInput: { path: '/test.ts' }, _meta: { toolName: 'read_file', + // #4175 F4 prereq — ToolCallEmitter now stamps provenance + // on every tool_call / tool_call_update event so the UI can + // dispatch on builtin / mcp / subagent without string- + // matching toolName. + provenance: 'builtin', timestamp: toEpochMs(record.timestamp), }, }), @@ -314,6 +319,8 @@ describe('HistoryReplayer', () => { rawOutput: 'File contents here', _meta: { toolName: 'read_file', + // #4175 F4 prereq — provenance stamped on update events too. + provenance: 'builtin', timestamp: toEpochMs(record.timestamp), }, }); diff --git a/packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.test.ts b/packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.test.ts index 6acc302221..4b2e8a3698 100644 --- a/packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.test.ts +++ b/packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.test.ts @@ -6,7 +6,7 @@ import { describe, it, expect, vi, beforeEach } from 'vitest'; import { ToolCallEmitter } from './ToolCallEmitter.js'; -import type { SessionContext } from '../types.js'; +import type { SessionContext, SubagentMeta } from '../types.js'; import type { Config, ToolRegistry, @@ -77,7 +77,7 @@ describe('ToolCallEmitter', () => { locations: [], kind: 'other', rawInput: { arg1: 'value1' }, - _meta: { toolName: 'unknown_tool' }, + _meta: { toolName: 'unknown_tool', provenance: 'builtin' }, }); }); @@ -101,7 +101,7 @@ describe('ToolCallEmitter', () => { locations: [{ path: '/test/file.ts', line: 10 }], kind: 'edit', rawInput: { path: '/test.ts' }, - _meta: { toolName: 'edit_file' }, + _meta: { toolName: 'edit_file', provenance: 'builtin' }, }); }); @@ -125,7 +125,7 @@ describe('ToolCallEmitter', () => { expect(sendUpdateSpy).toHaveBeenCalledWith( expect.objectContaining({ rawInput: {}, - _meta: { toolName: 'test_tool' }, + _meta: { toolName: 'test_tool', provenance: 'builtin' }, }), ); }); @@ -153,7 +153,7 @@ describe('ToolCallEmitter', () => { locations: [], // Fallback to empty kind: 'other', // Fallback to other rawInput: { invalid: true }, - _meta: { toolName: 'failing_tool' }, + _meta: { toolName: 'failing_tool', provenance: 'builtin' }, }); }); }); @@ -174,7 +174,7 @@ describe('ToolCallEmitter', () => { toolCallId: 'call-123', status: 'completed', rawOutput: 'Tool completed successfully', - _meta: { toolName: 'test_tool' }, + _meta: { toolName: 'test_tool', provenance: 'builtin' }, }), ); }); @@ -198,7 +198,7 @@ describe('ToolCallEmitter', () => { content: { type: 'text', text: 'Something went wrong' }, }, ], - _meta: { toolName: 'test_tool' }, + _meta: { toolName: 'test_tool', provenance: 'builtin' }, }); }); @@ -228,7 +228,7 @@ describe('ToolCallEmitter', () => { newText: 'new content', }, ], - _meta: { toolName: 'edit_file' }, + _meta: { toolName: 'edit_file', provenance: 'builtin' }, }), ); }); @@ -263,7 +263,7 @@ describe('ToolCallEmitter', () => { }, }, ], - _meta: { toolName: 'edit_file' }, + _meta: { toolName: 'edit_file', provenance: 'builtin' }, }), ); }); @@ -289,7 +289,7 @@ describe('ToolCallEmitter', () => { }, ], rawOutput: 'raw output', - _meta: { toolName: 'test_tool' }, + _meta: { toolName: 'test_tool', provenance: 'builtin' }, }), ); }); @@ -307,7 +307,7 @@ describe('ToolCallEmitter', () => { toolCallId: 'call-empty', status: 'completed', content: [], - _meta: { toolName: 'test_tool' }, + _meta: { toolName: 'test_tool', provenance: 'builtin' }, }); }); @@ -399,7 +399,7 @@ describe('ToolCallEmitter', () => { content: { type: 'text', text: 'Connection timeout' }, }, ], - _meta: { toolName: 'test_tool' }, + _meta: { toolName: 'test_tool', provenance: 'builtin' }, }); }); }); @@ -543,7 +543,7 @@ describe('ToolCallEmitter', () => { }, ], rawOutput: { unknownField: 'value', nested: { data: 123 } }, - _meta: { toolName: 'test_tool' }, + _meta: { toolName: 'test_tool', provenance: 'builtin' }, }), ); }); @@ -565,7 +565,7 @@ describe('ToolCallEmitter', () => { toolCallId: 'call-extra', status: 'completed', rawOutput: 'Result text', - _meta: { toolName: 'test_tool' }, + _meta: { toolName: 'test_tool', provenance: 'builtin' }, }), ); }); @@ -580,7 +580,10 @@ describe('ToolCallEmitter', () => { const call = sendUpdateSpy.mock.calls[0][0]; expect(call.rawOutput).toBeUndefined(); - expect(call._meta).toEqual({ toolName: 'test_tool' }); + expect(call._meta).toEqual({ + toolName: 'test_tool', + provenance: 'builtin', + }); }); }); @@ -671,7 +674,7 @@ describe('ToolCallEmitter', () => { content: { type: 'text', text: 'Text content from message' }, }, ], - _meta: { toolName: 'test_tool' }, + _meta: { toolName: 'test_tool', provenance: 'builtin' }, }); }); @@ -703,10 +706,121 @@ describe('ToolCallEmitter', () => { }, ], rawOutput: 'raw result', - _meta: { toolName: 'test_tool' }, + _meta: { toolName: 'test_tool', provenance: 'builtin' }, }), ); }); }); }); + + describe('resolveToolProvenance (#4175 F4 prereq, chiga0 #19 P0)', () => { + // Pure static utility — exercise without an emitter instance. + it('classifies a plain tool name as builtin (no serverId)', () => { + const out = ToolCallEmitter.resolveToolProvenance('shell'); + expect(out).toEqual({ provenance: 'builtin' }); + }); + + it('classifies a tool name without mcp__ prefix as builtin', () => { + const out = ToolCallEmitter.resolveToolProvenance('read_file'); + expect(out).toEqual({ provenance: 'builtin' }); + }); + + it('classifies mcp____ as mcp with serverId', () => { + const out = ToolCallEmitter.resolveToolProvenance( + 'mcp__filesystem__read', + ); + expect(out).toEqual({ provenance: 'mcp', serverId: 'filesystem' }); + }); + + it('preserves underscores in the tool segment', () => { + // Server segment is `playwright`; tool segment is `take_screenshot` + // (with underscore inside the tool name — `split("__")` handles + // this because we split on the double-underscore delimiter). + const out = ToolCallEmitter.resolveToolProvenance( + 'mcp__playwright__take_screenshot', + ); + expect(out).toEqual({ provenance: 'mcp', serverId: 'playwright' }); + }); + + it('classifies malformed mcp__ prefix (only one segment) as builtin', () => { + // No double-underscore delimiter past the prefix → not a valid + // mcp tool name; fall back to builtin rather than stamping + // garbage serverId. + const out = ToolCallEmitter.resolveToolProvenance('mcp__just_one'); + expect(out).toEqual({ provenance: 'builtin' }); + }); + + it('classifies mcp____ as builtin (empty server segment)', () => { + const out = ToolCallEmitter.resolveToolProvenance('mcp____read'); + expect(out).toEqual({ provenance: 'builtin' }); + }); + + it('classifies any tool as subagent when subagentMeta is present', () => { + // subagent takes precedence over mcp__ naming — a sub-agent + // calling an MCP tool is rendered as "subagent block" not + // "MCP block" in the UI. + const out = ToolCallEmitter.resolveToolProvenance('mcp__fs__read', { + agentType: 'researcher', + } as unknown as SubagentMeta); + expect(out).toEqual({ provenance: 'subagent' }); + }); + + it('classifies a plain builtin tool with subagentMeta as subagent', () => { + const out = ToolCallEmitter.resolveToolProvenance('shell', { + agentType: 'coder', + } as unknown as SubagentMeta); + expect(out).toEqual({ provenance: 'subagent' }); + }); + }); + + describe('provenance stamping on emit (#4175 F4 prereq)', () => { + it('stamps provenance:mcp + serverId on emitStart for mcp__ tools', async () => { + await emitter.emitStart({ + toolName: 'mcp__github__create_issue', + callId: 'call-mcp', + args: { title: 'bug' }, + }); + expect(sendUpdateSpy).toHaveBeenCalledWith( + expect.objectContaining({ + sessionUpdate: 'tool_call', + _meta: expect.objectContaining({ + toolName: 'mcp__github__create_issue', + provenance: 'mcp', + serverId: 'github', + }), + }), + ); + }); + + it('stamps provenance:subagent (no serverId) when subagentMeta present', async () => { + await emitter.emitStart({ + toolName: 'shell', + callId: 'call-sub', + args: {}, + subagentMeta: { agentType: 'researcher' } as unknown as SubagentMeta, + }); + const call = sendUpdateSpy.mock.calls[0][0]; + expect(call._meta.provenance).toBe('subagent'); + expect(call._meta.serverId).toBeUndefined(); + }); + + it('stamps provenance on emitResult so reconnecting clients can re-derive it', async () => { + await emitter.emitResult({ + toolName: 'mcp__db__query', + callId: 'call-r', + success: true, + message: [], + }); + const call = sendUpdateSpy.mock.calls[0][0]; + expect(call._meta.provenance).toBe('mcp'); + expect(call._meta.serverId).toBe('db'); + }); + + it('stamps provenance on emitError as well', async () => { + await emitter.emitError('call-e', 'mcp__fs__write', new Error('boom')); + const call = sendUpdateSpy.mock.calls[0][0]; + expect(call._meta.provenance).toBe('mcp'); + expect(call._meta.serverId).toBe('fs'); + }); + }); }); diff --git a/packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.ts b/packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.ts index 92f66ee474..6f3c042c5a 100644 --- a/packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.ts +++ b/packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.ts @@ -57,6 +57,10 @@ export class ToolCallEmitter extends BaseEmitter { params.toolName, params.args, ); + const provenance = ToolCallEmitter.resolveToolProvenance( + params.toolName, + params.subagentMeta, + ); await this.sendUpdate({ sessionUpdate: 'tool_call', @@ -70,6 +74,8 @@ export class ToolCallEmitter extends BaseEmitter { _meta: { toolName: params.toolName, ...params.subagentMeta, + provenance: provenance.provenance, + ...(provenance.serverId ? { serverId: provenance.serverId } : {}), ...(BaseEmitter.toEpochMs(params.timestamp) != null && { timestamp: BaseEmitter.toEpochMs(params.timestamp), }), @@ -124,6 +130,10 @@ export class ToolCallEmitter extends BaseEmitter { } // Build the update + const provenance = ToolCallEmitter.resolveToolProvenance( + params.toolName, + params.subagentMeta, + ); const update: Parameters[0] = { sessionUpdate: 'tool_call_update', toolCallId: params.callId, @@ -132,6 +142,8 @@ export class ToolCallEmitter extends BaseEmitter { _meta: { toolName: params.toolName, ...params.subagentMeta, + provenance: provenance.provenance, + ...(provenance.serverId ? { serverId: provenance.serverId } : {}), ...(BaseEmitter.toEpochMs(params.timestamp) != null && { timestamp: BaseEmitter.toEpochMs(params.timestamp), }), @@ -161,6 +173,10 @@ export class ToolCallEmitter extends BaseEmitter { error: Error, subagentMeta?: SubagentMeta, ): Promise { + const provenance = ToolCallEmitter.resolveToolProvenance( + toolName, + subagentMeta, + ); await this.sendUpdate({ sessionUpdate: 'tool_call_update', toolCallId: callId, @@ -171,10 +187,53 @@ export class ToolCallEmitter extends BaseEmitter { _meta: { toolName, ...subagentMeta, + provenance: provenance.provenance, + ...(provenance.serverId ? { serverId: provenance.serverId } : {}), }, }); } + /** + * Resolve a tool's provenance for UI dispatch on tool_call events + * (#4175 F4 prereq, chiga0 issue #19 P0). The SDK reads `_meta. + * provenance` + `_meta.serverId` to render builtin / MCP-server-badge / + * subagent-block differently. Without this stamping, the SDK falls + * back to string-matching the toolName which can't reliably + * distinguish builtin from subagent. + * + * Resolution rules: + * - `subagentMeta` present → `'subagent'` (a Task tool / Codex + * subagent / etc. wrapping its own tool calls) + * - toolName matches `mcp____` → `'mcp'` with + * `serverId: `. Naming convention from `@qwen-code/core/ + * mcp-tool` — mirrors the SDK's same heuristic fallback so SDK + * consumers stay consistent with daemon classification. + * - everything else → `'builtin'` + * + * Static + pure so it can be unit-tested without an emitter + * instance. Exported via `ToolCallEmitter.resolveToolProvenance`. + */ + static resolveToolProvenance( + toolName: string, + subagentMeta?: SubagentMeta, + ): { provenance: 'builtin' | 'mcp' | 'subagent'; serverId?: string } { + if (subagentMeta !== undefined) { + return { provenance: 'subagent' }; + } + if (toolName.startsWith('mcp__')) { + // mcp____ — split is "__", not single "_", + // so server / tool segments can contain underscores. Require + // both a non-empty server segment and at least one segment past + // it; malformed names fall through to 'builtin' rather than + // stamping an empty/garbage serverId. + const parts = toolName.split('__'); + if (parts.length >= 3 && parts[1] && parts[1].length > 0) { + return { provenance: 'mcp', serverId: parts[1] }; + } + } + return { provenance: 'builtin' }; + } + // ==================== Public Utilities ==================== /** diff --git a/packages/cli/src/serve/server.test.ts b/packages/cli/src/serve/server.test.ts index 4567acebed..636719d5c1 100644 --- a/packages/cli/src/serve/server.test.ts +++ b/packages/cli/src/serve/server.test.ts @@ -4358,7 +4358,10 @@ describe('GET /session/:id/events (SSE)', () => { expect(frames).toHaveLength(2); expect(frames[0]?.id).toBe('1'); expect(frames[0]?.event).toBe('session_update'); - expect(JSON.parse(frames[0]!.data!)).toEqual({ + // `toMatchObject` rather than `toEqual` because the SSE write + // boundary stamps `_meta.serverTimestamp` (#4175 F4 prereq); + // a dedicated test below pins that field's shape. + expect(JSON.parse(frames[0]!.data!)).toMatchObject({ id: 1, v: 1, type: 'session_update', @@ -4367,6 +4370,80 @@ describe('GET /session/:id/events (SSE)', () => { expect(frames[1]?.id).toBe('2'); }); + it('stamps _meta.serverTimestamp on every SSE frame (#4175 F4 prereq, chiga0 #19 P0)', async () => { + // The daemon stamps `_meta.serverTimestamp` at the SSE write + // boundary so multi-client UIs use the server clock for transcript + // ordering / "X minutes ago" instead of each client's drifting + // local clock. The chiga0 SDK PR #4353 reads this via a 3- + // location probe (`event.serverTimestamp` / `event._meta. + // serverTimestamp` / `event.data._meta.serverTimestamp`); we + // pick `_meta.serverTimestamp` (Anthropic convention) so the + // top-level event type stays unpolluted. + const before = Date.now(); + const bridge = fakeBridge({ + async *subscribeImpl(_sessionId, _opts) { + yield { + id: 1, + v: 1, + type: 'session_update', + data: { foo: 'bar' }, + }; + await new Promise(() => {}); + }, + }); + handle = await runQwenServe( + { hostname: '127.0.0.1', port: 0, mode: 'http-bridge' }, + { bridge }, + ); + const port = (handle.server.address() as { port: number }).port; + + const res = await fetch(`http://127.0.0.1:${port}/session/sess-A/events`); + const frames = await readSseFrames(res.body!, 1); + const after = Date.now(); + + const parsed = JSON.parse(frames[0]!.data!); + expect(parsed._meta).toBeDefined(); + expect(typeof parsed._meta.serverTimestamp).toBe('number'); + // Server clock at stamp time must fall within the test's + // before/after wall-clock window. + expect(parsed._meta.serverTimestamp).toBeGreaterThanOrEqual(before); + expect(parsed._meta.serverTimestamp).toBeLessThanOrEqual(after); + }); + + it('preserves pre-existing _meta keys when stamping serverTimestamp', async () => { + // ToolCallEmitter (and other emitters) attach `_meta.toolName` etc. + // The SSE boundary stamp must MERGE (not overwrite) so downstream + // consumers keep both fields. `BridgeEvent` doesn't type `_meta` + // explicitly (it's a wire-only escape hatch) so we cast the yield. + const bridge = fakeBridge({ + async *subscribeImpl(_sessionId, _opts) { + yield { + id: 1, + v: 1, + type: 'session_update', + data: { sessionUpdate: 'tool_call', toolCallId: 't1' }, + // Pre-existing _meta on the event (mimics ToolCallEmitter). + _meta: { toolName: 'Read', timestamp: 1234567890 }, + } as unknown as { id: 1; v: 1; type: 'session_update'; data: unknown }; + await new Promise(() => {}); + }, + }); + handle = await runQwenServe( + { hostname: '127.0.0.1', port: 0, mode: 'http-bridge' }, + { bridge }, + ); + const port = (handle.server.address() as { port: number }).port; + + const res = await fetch(`http://127.0.0.1:${port}/session/sess-A/events`); + const frames = await readSseFrames(res.body!, 1); + + const parsed = JSON.parse(frames[0]!.data!); + // Both the pre-existing _meta keys AND serverTimestamp must survive. + expect(parsed._meta.toolName).toBe('Read'); + expect(parsed._meta.timestamp).toBe(1234567890); + expect(typeof parsed._meta.serverTimestamp).toBe('number'); + }); + it('forwards Last-Event-ID to the bridge', async () => { const seen: number[] = []; const bridge = fakeBridge({ @@ -4574,9 +4651,38 @@ describe('GET /session/:id/events (SSE)', () => { // it doesn't pollute the per-session monotonic sequence used for // Last-Event-ID resume. expect(frames[1]?.id).toBeUndefined(); + // `Error('agent died')` isn't classified by `mapDomainErrorToErrorKind` + // (no Bridge*Error class, no errno code, no special name), so no + // `errorKind` is stamped — only `error`. The next test covers the + // classified-error path. expect(JSON.parse(frames[1]!.data!).data).toEqual({ error: 'agent died' }); }); + it('stamps errorKind on stream_error when the thrown error is classified (#4175 F4 prereq, chiga0 #19 P0)', async () => { + // BridgeTimeoutError → `init_timeout` per mapDomainErrorToErrorKind. + // UI consumers can render "retry" on init_timeout vs "show stack + // trace" on unknown errors, without regex-matching the message + // string. + const { BridgeTimeoutError } = await import('@qwen-code/acp-bridge'); + const bridge = fakeBridge({ + async *subscribeImpl(_sessionId, _opts) { + yield { id: 1, v: 1, type: 'session_update', data: 'first' }; + throw new BridgeTimeoutError('initialize timed out'); + }, + }); + handle = await runQwenServe( + { hostname: '127.0.0.1', port: 0, mode: 'http-bridge' }, + { bridge }, + ); + const port = (handle.server.address() as { port: number }).port; + const res = await fetch(`http://127.0.0.1:${port}/session/sess-A/events`); + const frames = await readSseFrames(res.body!, 2); + expect(frames[1]?.event).toBe('stream_error'); + const parsed = JSON.parse(frames[1]!.data!); + expect(parsed.data.errorKind).toBe('init_timeout'); + expect(parsed.data.error).toContain('timed out'); + }); + it('forwards numeric Last-Event-ID even when supplied as a string', async () => { let seen: number | undefined; const bridge = fakeBridge({ diff --git a/packages/cli/src/serve/server.ts b/packages/cli/src/serve/server.ts index 4a2bb10616..808d2d9729 100644 --- a/packages/cli/src/serve/server.ts +++ b/packages/cli/src/serve/server.ts @@ -27,6 +27,7 @@ import { type DeviceFlowProviderId, type DeviceFlowPublicView, } from './auth/deviceFlow.js'; +import { mapDomainErrorToErrorKind } from '@qwen-code/acp-bridge'; import { QwenOAuthDeviceFlowProvider } from './auth/qwenDeviceFlowProvider.js'; import { createBridgeFileSystemAdapter } from './bridgeFileSystemAdapter.js'; import { createDaemonStatusProvider } from './daemonStatusProvider.js'; @@ -1943,11 +1944,23 @@ export function createServeApp( // hard-coded `id: 0` would regress the client's `Last-Event-ID` // tracker. `formatSseFrame` omits the `id:` line when the input // event has no id. + // + // `errorKind` (#4175 F4 prereq, chiga0 issue #19 P0): stamp the + // classified error kind so UIs can render typed responses + // (auth retry / file picker / proxy hint / etc.) rather than + // regex-matching the human-readable `error` string. Returns + // `undefined` for unclassified errors — SDK falls back to + // rendering `error` text as before, so adding `errorKind` is + // strictly additive / backward-compatible. + const errorKind = mapDomainErrorToErrorKind(err); await writeWithBackpressure( formatSseFrame({ v: 1, type: 'stream_error', - data: { error: errorMessage(err) }, + data: { + error: errorMessage(err), + ...(errorKind ? { errorKind } : {}), + }, }), ).catch(() => {}); } @@ -2540,7 +2553,25 @@ function formatSseFrame(event: BridgeEvent | OmitId): string { // The SDK parser at `sdk-typescript/src/daemon/sse.ts` handles the // multi-line variant on the receive side — input/output asymmetry is // intentional. - const dataJson = JSON.stringify(event); + // + // `_meta.serverTimestamp` (#4175 F4 prereq, chiga0 issue #19 P0): stamp + // the daemon's wall-clock at SSE write time so multi-client transcript + // ordering and "X minutes ago" UIs use the server's clock rather than + // each client's drifting local clock. Stamped at the wire boundary + // (NOT at EventBus.publish) so the in-memory `BridgeEvent` type stays + // unchanged and internal consumers don't see `_meta`. Pre-existing + // `_meta` keys on the event (e.g. tool_call carries `_meta.toolName` / + // `_meta.timestamp` from ToolCallEmitter) are preserved by spreading + // first. SDK consumers read via the 3-location probe in + // `extractServerTimestamp` (sdk-typescript) — `_meta.serverTimestamp` + // is one of the supported locations; the other two stay free for + // future use. + const existingMeta = (event as { _meta?: Record })._meta; + const stamped = { + ...event, + _meta: { ...(existingMeta ?? {}), serverTimestamp: Date.now() }, + }; + const dataJson = JSON.stringify(stamped); const idLine = 'id' in event && event.id !== undefined ? `id: ${event.id}\n` : ''; return `${idLine}event: ${event.type}\ndata: ${dataJson}\n\n`; From c1a2f0a78fdfa8cbecd8ce8fcf1ada2809c41e87 Mon Sep 17 00:00:00 2001 From: doudouOUC Date: Wed, 20 May 2026 20:44:43 +0800 Subject: [PATCH 2/6] feat(serve+sdk): detect SSE ring eviction on resume, expose state_resync_required (#4175 F4 prereq) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the multi-client SSE reducer divergence bug Ilya0527 raised in #4175 comment #15. Pre-fix scenario: 1. Consumer's SSE stream drops; client buffers `Last-Event-ID: N`. 2. Network reconnects long enough later that events `[N+1, ringHead-1]` were evicted from the daemon's per-session ring. 3. Daemon's `subscribe({lastEventId: N})` silently replays only the surviving suffix. 4. Consumer's SDK reducer keeps applying deltas as if the stream was contiguous. Its state has now drifted from the daemon's truth — no terminal signal, no warning. The `SessionState` reducer's "same event stream in → same state out" purity guarantee is broken. The bug's blast radius is exactly when multi-client matters: F4 brings up the TUI / IDE / web client adapters that share session state, so divergence becomes visibly inconsistent across clients. **Daemon side** (`packages/acp-bridge/src/eventBus.ts`) In `subscribe()`'s replay path, detect ring eviction by comparing the ring's earliest id against `lastEventId + 1`. When a gap exists, force-push a synthetic terminal `state_resync_required` frame BEFORE the surviving replay events: ``` { v: 1, type: 'state_resync_required', data: { reason: 'ring_evicted', lastDeliveredId: N, earliestAvailableId: M } } ``` Per user-confirmed design (issue comment discussion): the frame has NO `id` (mirrors the `client_evicted` synthetic terminal pattern so it doesn't burn a slot in the per-session monotonic sequence). Replay continues after the resync frame — the SDK reducer auto-skips subsequent deltas (see below) but the frames stay on the wire so adapters have the option to compute a "what you missed" diff later. **SDK side** (`packages/sdk-typescript/src/daemon/events.ts`) Adds: - `'state_resync_required'` to `DAEMON_EVENT_TYPES` union - `DaemonStateResyncRequiredData` + `DaemonStateResyncRequiredEvent` - `isStateResyncRequiredData` predicate - `DaemonStreamLifecycleEvent` union widened - Reducer state fields: `awaitingResync: boolean`, `resyncRequiredCount: number`, `lastResyncRequired?` - Reducer case for `state_resync_required` — sets the flag, increments count, records data - **Top-of-reducer gate**: when `awaitingResync === true`, all non- terminal events are auto-skipped (still advance `lastEventId`). Terminal lifecycle events (`session_died` / `session_closed` / `client_evicted` / `stream_error`) STILL apply — critical end-of- stream signals don't depend on prior state being current. - Re-exported `DaemonStateResyncRequiredData` / Event from `daemon/index.ts` and `src/index.ts` (matches surface posture of sibling lifecycle types). Consumer recovery contract: when `state.awaitingResync === true`, call `loadSession` (out of band) to fetch the daemon's canonical session snapshot, then reconstruct view state via `createDaemonSessionViewState({...seed from loaded state})`. The fresh state defaults `awaitingResync: false` so the seed implicitly clears the flag. **Side fix** (`stream_error` errorKind) `DaemonStreamErrorData.errorKind?: string` typed for the optional classification field that Commit 1 (`14637cd79`) added daemon-side. Strictly additive — old daemons omit the field, SDK falls back to rendering `error` text. Verification - `packages/acp-bridge`: 6 files, 108/108 pass (+5 new resync-detection tests; 1 existing "default ring size 8000" test updated to acknowledge the synthetic resync frame at the head of its replay batch). - `packages/sdk-typescript`: 13 files, 451/451 pass (+8 new reducer resync tests covering set/skip/terminal-passthrough/recovery/ repeated-resync/malformed-payload). - TypeScript clean across both packages on touched regions. --- packages/acp-bridge/src/eventBus.test.ts | 145 ++++++++++++- packages/acp-bridge/src/eventBus.ts | 39 ++++ packages/sdk-typescript/src/daemon/events.ts | 172 ++++++++++++++- packages/sdk-typescript/src/daemon/index.ts | 4 + packages/sdk-typescript/src/index.ts | 4 + .../test/unit/daemonEvents.test.ts | 200 ++++++++++++++++++ 6 files changed, 560 insertions(+), 4 deletions(-) diff --git a/packages/acp-bridge/src/eventBus.test.ts b/packages/acp-bridge/src/eventBus.test.ts index 029526b8f8..271219fc20 100644 --- a/packages/acp-bridge/src/eventBus.test.ts +++ b/packages/acp-bridge/src/eventBus.test.ts @@ -274,13 +274,21 @@ describe('EventBus', () => { // A `lastEventId: 0` resume with a queue cap larger than the ring // collects exactly 8000 live frames; ids start at 2 because id=1 // was the one shifted out of the ring. + // + // #4175 F4 prereq: `lastEventId: 0` + earliest-id-in-ring = 2 + // crosses the eviction-detection threshold (earliest > last + 1), + // so an extra synthetic `state_resync_required` frame is emitted + // FIRST. The filter below restricts to live ids, which excludes + // the synthetic (no id), so the original "8000 live frames" + // invariant is preserved. const abort = new AbortController(); const iter = bus.subscribe({ lastEventId: 0, maxQueued: 9000, signal: abort.signal, }); - const events = await collect(iter, 8000); + // Collect 8001 frames now: 1 synthetic resync + 8000 live. + const events = await collect(iter, 8001); abort.abort(); const liveIds = events .filter((e) => e.id !== undefined) @@ -288,6 +296,8 @@ describe('EventBus', () => { expect(liveIds).toHaveLength(8000); expect(liveIds[0]).toBe(2); expect(liveIds[liveIds.length - 1]).toBe(8001); + // The synthetic resync frame is the first one. + expect(events[0]?.type).toBe('state_resync_required'); }); it('eviction detaches the abort listener from a stalled consumer (BmJT1)', async () => { @@ -480,9 +490,138 @@ describe('EventBus', () => { const out: BridgeEvent[] = []; for await (const e of iter) { out.push(e); - if (out.length === 3) break; + // After the state_resync_required frame (synthetic) + 3 replay + // frames, we're done. + if (out.length === 4) break; } - expect(out.map((e) => e.id)).toEqual([3, 4, 5]); + // First frame is the synthetic state_resync_required (no id). + expect(out[0]?.type).toBe('state_resync_required'); + expect(out[0]?.id).toBeUndefined(); + // Then the 3 surviving ring frames. + expect(out.slice(1).map((e) => e.id)).toEqual([3, 4, 5]); abort.abort(); }); + + describe('state_resync_required (#4175 F4 prereq, Ilya0527 issue #15)', () => { + it('emits state_resync_required when lastEventId is past the ring head', async () => { + // Setup: ring holds 3, ids 1..5 published → ring contains [3,4,5]. + // Consumer reconnects with Last-Event-ID: 1 → events 2 was evicted. + // Daemon must emit state_resync_required FIRST so SDK reducer + // knows its state is stale before applying any replay frames. + const bus = new EventBus(3); + for (let i = 1; i <= 5; i++) bus.publish({ type: 'foo', data: i }); + const abort = new AbortController(); + const iter = bus.subscribe({ + lastEventId: 1, + signal: abort.signal, + }); + const out: BridgeEvent[] = []; + for await (const e of iter) { + out.push(e); + if (out.length === 4) break; + } + // First frame is the resync terminal (synthetic, no id). + expect(out[0]?.type).toBe('state_resync_required'); + expect(out[0]?.id).toBeUndefined(); + const data = out[0]?.data as { + reason: string; + lastDeliveredId: number; + earliestAvailableId: number; + }; + expect(data.reason).toBe('ring_evicted'); + expect(data.lastDeliveredId).toBe(1); + expect(data.earliestAvailableId).toBe(3); // event 2 was evicted + // Replay continues after the resync frame (per design — SDK can + // compute "what you missed" diff later) — so we still get the + // 3 surviving ring frames. + expect(out.slice(1).map((e) => e.id)).toEqual([3, 4, 5]); + abort.abort(); + }); + + it('does NOT emit state_resync_required when lastEventId is in the ring', async () => { + // Consumer's lastEventId is well within the ring → no gap → no + // resync needed. + const bus = new EventBus(10); + for (let i = 1; i <= 5; i++) bus.publish({ type: 'foo', data: i }); + const abort = new AbortController(); + const iter = bus.subscribe({ + lastEventId: 2, + signal: abort.signal, + }); + const out: BridgeEvent[] = []; + for await (const e of iter) { + out.push(e); + if (out.length === 3) break; + } + // No resync frame — just the 3 replay frames (ids 3, 4, 5). + expect(out.map((e) => e.id)).toEqual([3, 4, 5]); + expect(out.some((e) => e.type === 'state_resync_required')).toBe(false); + abort.abort(); + }); + + it('does NOT emit state_resync_required at the exact boundary (lastEventId === earliest - 1)', async () => { + // Boundary: ring's earliest id is N, lastEventId is N-1. + // No gap → no resync. Off-by-one guard. + const bus = new EventBus(3); + for (let i = 1; i <= 5; i++) bus.publish({ type: 'foo', data: i }); + // Ring is now [3, 4, 5]. lastEventId=2 means "I have 1 and 2"; + // next expected is 3, which IS in the ring. No gap. + const abort = new AbortController(); + const iter = bus.subscribe({ + lastEventId: 2, + signal: abort.signal, + }); + const out: BridgeEvent[] = []; + for await (const e of iter) { + out.push(e); + if (out.length === 3) break; + } + expect(out.some((e) => e.type === 'state_resync_required')).toBe(false); + expect(out.map((e) => e.id)).toEqual([3, 4, 5]); + abort.abort(); + }); + + it('does NOT emit state_resync_required when ring is empty', async () => { + // No publishes yet → earliestInRing is undefined → resync check + // skipped. Subscriber just waits for live events. + const bus = new EventBus(10); + const abort = new AbortController(); + const iter = bus.subscribe({ + lastEventId: 5, + signal: abort.signal, + }); + // Publish one live event AFTER subscribe to confirm the stream works. + setTimeout(() => bus.publish({ type: 'foo', data: 1 }), 0); + const out: BridgeEvent[] = []; + for await (const e of iter) { + out.push(e); + if (out.length === 1) break; + } + // No resync frame — just the one live event. + expect(out[0]?.type).toBe('foo'); + expect(out[0]?.id).toBe(1); + abort.abort(); + }); + + it('does NOT emit state_resync_required when no lastEventId is provided (fresh subscribe)', async () => { + // First-time subscriber has no prior state to resync — resync + // would be meaningless. Check the no-lastEventId branch is + // skipped entirely. + const bus = new EventBus(3); + for (let i = 1; i <= 5; i++) bus.publish({ type: 'foo', data: i }); + const abort = new AbortController(); + const iter = bus.subscribe({ signal: abort.signal }); + // Live-only — publish one event after subscribe to give the + // iterator something to yield. + setTimeout(() => bus.publish({ type: 'foo', data: 99 }), 0); + const out: BridgeEvent[] = []; + for await (const e of iter) { + out.push(e); + if (out.length === 1) break; + } + expect(out[0]?.type).toBe('foo'); + expect(out.some((e) => e.type === 'state_resync_required')).toBe(false); + abort.abort(); + }); + }); }); diff --git a/packages/acp-bridge/src/eventBus.ts b/packages/acp-bridge/src/eventBus.ts index 861e02fbc1..ce6b024d49 100644 --- a/packages/acp-bridge/src/eventBus.ts +++ b/packages/acp-bridge/src/eventBus.ts @@ -357,6 +357,45 @@ export class EventBus { this.subs.add(sub); if (opts.lastEventId !== undefined) { + // Detect ring eviction on resume (#4175 F4 prereq, Ilya0527 + // issue #15): if the earliest event still in the ring has + // `id > lastEventId + 1`, then events between `lastEventId + 1` + // and `earliestInRing - 1` were evicted before the consumer + // reconnected — the consumer's reducer has a gap it doesn't + // know about. Pre-fix the resume silently succeeded ("you + // caught up!") even though the SDK reducer's state was now + // diverged from the daemon's truth. + // + // Emit `state_resync_required` as a TERMINAL synthetic frame + // (no `id` — same pattern as `client_evicted` so it doesn't + // burn a slot in the per-session monotonic sequence other + // subscribers observe). The SDK reducer treats this as "your + // state is stale; call loadSession before applying any further + // deltas" — see `awaitingResync` flag in the SDK reducer. + // + // Replay continues after the resync frame (per design): the + // SDK reducer will auto-skip delta application until + // loadSession clears the flag, but the frames stay on the + // wire so SDK has the option to compute a "what you missed" + // diff later. This is network-friendly (no extra reconnect) + // and matches the existing `client_evicted` semantics + // (terminal-on-close, but consumer can still drain queued + // frames first). + const earliestInRing = this.ring[0]?.id; + if ( + earliestInRing !== undefined && + earliestInRing > opts.lastEventId + 1 + ) { + queue.forcePush({ + v: EVENT_SCHEMA_VERSION, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: opts.lastEventId, + earliestAvailableId: earliestInRing, + }, + }); + } // Force-push replay frames so they bypass the per-subscriber size // cap. The cap protects against a slow live consumer; replay is // already historical and silently dropping it would undermine the diff --git a/packages/sdk-typescript/src/daemon/events.ts b/packages/sdk-typescript/src/daemon/events.ts index a3a2d7bf2c..e37e4065b1 100644 --- a/packages/sdk-typescript/src/daemon/events.ts +++ b/packages/sdk-typescript/src/daemon/events.ts @@ -23,6 +23,18 @@ const DAEMON_KNOWN_EVENT_TYPE_VALUES = [ 'client_evicted', 'slow_client_warning', 'stream_error', + // #4175 F4 prereq (Ilya0527 issue #15) — terminal-style frame + // emitted by the daemon when an SSE consumer reconnects with a + // `Last-Event-ID` past the ring's earliest available id (the + // events between the consumer's last-seen id and the ring head + // were evicted before reconnect). The reducer treats this as + // "your accumulated state is stale; call `loadSession` and + // reseed view state before applying any further deltas". It does + // NOT close the stream — the daemon continues replaying surviving + // ring frames and live frames, but the reducer auto-skips them + // until the consumer reseeds state. Synthetic (no `id`) so it + // doesn't burn a slot in the per-session monotonic sequence. + 'state_resync_required', // PR 14b — MCP guardrail push events. See `mcp_guardrail_events` // capability tag. Both fire on the per-session SSE bus; consumers // should pre-flight `caps.features.includes('mcp_guardrail_events')` @@ -218,6 +230,44 @@ export interface DaemonSlowClientWarningData { export interface DaemonStreamErrorData { error: string; + /** + * #4175 F4 prereq (chiga0 issue #19 P0). Classified error kind from + * the daemon's `mapDomainErrorToErrorKind` — one of the 7-value + * closed enum (`init_timeout` / `protocol_error` / `missing_binary` + * / `auth_env_error` / `blocked_egress` / `missing_file` / + * `parse_error`). Absent for unclassified errors — the daemon omits + * the field rather than stamping a meaningless value. UI consumers + * key on this for typed retry / remediation rendering (retry on + * init_timeout vs install on missing_binary, etc.) instead of + * regex-matching the `error` string. + */ + errorKind?: string; + [key: string]: unknown; +} + +/** + * #4175 F4 prereq (Ilya0527 issue #15). Payload for the + * `state_resync_required` synthetic frame the daemon emits when an + * SSE consumer reconnects with a `Last-Event-ID` past the ring's + * earliest available id. The reducer auto-skips subsequent delta + * frames until consumer code calls `loadSession` and reseeds view + * state — see `DaemonSessionViewState.awaitingResync`. + */ +export interface DaemonStateResyncRequiredData { + /** + * Machine-readable resync reason. Currently always `'ring_evicted'` + * (the only case the daemon emits this frame for); reserved for + * future causes (e.g. `'schema_version_bump'`). + */ + reason: string; + /** Consumer's `Last-Event-ID` at reconnect time. */ + lastDeliveredId: number; + /** + * The earliest event id still in the daemon's per-session ring at + * reconnect time. The gap is `[lastDeliveredId + 1, + * earliestAvailableId - 1]` inclusive. + */ + earliestAvailableId: number; [key: string]: unknown; } @@ -525,6 +575,10 @@ export type DaemonStreamErrorEvent = DaemonEventEnvelope< 'stream_error', DaemonStreamErrorData >; +export type DaemonStateResyncRequiredEvent = DaemonEventEnvelope< + 'state_resync_required', + DaemonStateResyncRequiredData +>; export type DaemonMcpBudgetWarningEvent = DaemonEventEnvelope< 'mcp_budget_warning', DaemonMcpBudgetWarningData @@ -613,7 +667,8 @@ export type DaemonControlEvent = export type DaemonStreamLifecycleEvent = | DaemonClientEvictedEvent | DaemonSlowClientWarningEvent - | DaemonStreamErrorEvent; + | DaemonStreamErrorEvent + | DaemonStateResyncRequiredEvent; /** * PR 14b: MCP guardrail push events. Grouped as their own union member @@ -763,6 +818,38 @@ export interface DaemonSessionViewState { * `forbiddenVotes`). */ forbiddenVoteCount: number; + /** + * #4175 F4 prereq (Ilya0527 issue #15). Set to true when the + * reducer observes a `state_resync_required` frame from the daemon + * (consumer reconnected with `Last-Event-ID` past the daemon's + * ring eviction point — events between last-delivered and ring- + * head were lost, so the accumulated view state is stale relative + * to the daemon's truth). + * + * While true, the reducer **auto-skips** all non-terminal delta + * events (still advances `lastEventId`) to prevent the consumer + * from rendering against a known-stale state. Terminal lifecycle + * events (`session_died` / `session_closed` / `client_evicted` / + * `stream_error`) still apply because they're critical end-of- + * stream signals that don't depend on prior state being current. + * + * Consumer recovery: when this is true, call `loadSession` to + * fetch the daemon's canonical session snapshot, then reconstruct + * view state via `createDaemonSessionViewState({...loaded state})`. + * The fresh state seed clears the flag implicitly (a new reducer + * instance starts fresh). + */ + awaitingResync: boolean; + /** + * Count of `state_resync_required` frames this stream has observed. + * Typically 0 (no resync) or 1 (single ring-eviction event); + * higher counts indicate the consumer is reconnecting repeatedly + * past the ring boundary, which is itself a debuggable signal + * (network instability or ring sizing wrong for the workload). + */ + resyncRequiredCount: number; + /** Most recent resync payload (reason + gap range). */ + lastResyncRequired?: DaemonStateResyncRequiredData; } /** @@ -777,6 +864,34 @@ export interface DaemonSessionViewState { */ const MAX_FORBIDDEN_VOTES_PER_SESSION = 32; +/** + * #4175 F4 prereq (Ilya0527 issue #15). Event types that the reducer + * still processes when `awaitingResync` is true. Two categories: + * + * - **`state_resync_required` itself** — so the reducer can update + * `lastResyncRequired` / `resyncRequiredCount` for *subsequent* + * resync frames (rare but possible: a consumer that reconnects + * past the ring twice in succession). + * - **Terminal lifecycle frames** — `session_died` / `session_closed` + * / `client_evicted` / `stream_error`. Critical end-of-stream + * signals that don't depend on prior state being current. UIs + * must still see "this session died" even if they were in resync + * limbo at the time. + * + * Everything else (session_update / permission_* / approval_mode_changed + * / workspace mutations / mcp guardrail / auth flow events) is auto- + * skipped while `awaitingResync` is true; `lastEventId` still advances + * via `advanceLastEventId(base)` so the resync recovery sequence stays + * monotonic. + */ +const RESYNC_PASSTHROUGH_TYPES = new Set([ + 'state_resync_required', + 'session_died', + 'session_closed', + 'client_evicted', + 'stream_error', +]); + export function createDaemonSessionViewState( seed: Partial = {}, ): DaemonSessionViewState { @@ -821,6 +936,14 @@ export function createDaemonSessionViewState( permissionVoteProgress: { ...seed.permissionVoteProgress }, forbiddenVotes: seed.forbiddenVotes ? [...seed.forbiddenVotes] : [], forbiddenVoteCount: seed.forbiddenVoteCount ?? 0, + // #4175 F4 prereq (Ilya0527 issue #15) — fresh view state always + // starts without a resync requirement. A consumer calling + // `createDaemonSessionViewState` after `loadSession` to recover + // from an earlier resync implicitly clears the flag through this + // default. + awaitingResync: seed.awaitingResync ?? false, + resyncRequiredCount: seed.resyncRequiredCount ?? 0, + lastResyncRequired: seed.lastResyncRequired, }; } @@ -898,6 +1021,10 @@ export function asKnownDaemonEvent( return isStreamErrorData(event.data) ? (event as DaemonStreamErrorEvent) : undefined; + case 'state_resync_required': + return isStateResyncRequiredData(event.data) + ? (event as DaemonStateResyncRequiredEvent) + : undefined; case 'mcp_budget_warning': return isMcpBudgetWarningData(event.data) ? (event as DaemonMcpBudgetWarningEvent) @@ -974,6 +1101,20 @@ export function reduceDaemonSessionEvent( }; } + // #4175 F4 prereq (Ilya0527 issue #15). When `awaitingResync` is + // set, the consumer's accumulated view state is known stale — + // the daemon's ring evicted events between the consumer's last + // delivered id and reconnect. Auto-skip non-terminal delta events + // (still advance `lastEventId` via `base`) so the consumer doesn't + // render against stale state. Terminal lifecycle events still + // apply — they're critical end-of-stream signals that don't + // depend on prior state. The flag clears when the consumer calls + // `loadSession` and reconstructs view state via + // `createDaemonSessionViewState`. + if (base.awaitingResync && !RESYNC_PASSTHROUGH_TYPES.has(event.type)) { + return base; + } + switch (event.type) { case 'session_update': return { @@ -1169,6 +1310,24 @@ export function reduceDaemonSessionEvent( forbiddenVotes: [], forbiddenVoteCount: 0, }; + case 'state_resync_required': + // #4175 F4 prereq (Ilya0527 issue #15). Mark the accumulated + // view state as stale; subsequent non-terminal deltas are + // auto-skipped at the top-of-reducer gate above until consumer + // recovery via `loadSession` + `createDaemonSessionViewState`. + // `alive` and `terminalEvent` are NOT touched — the stream is + // still healthy; only the consumer's local accumulation is + // suspect. `pendingPermissions` is intentionally preserved + // (cleared by `loadSession`-driven recovery, not by the + // resync signal itself) so we don't synthesize a no-op + // "permission no longer pending" state transition while the + // consumer is still figuring out what's real. + return { + ...base, + awaitingResync: true, + resyncRequiredCount: base.resyncRequiredCount + 1, + lastResyncRequired: event.data, + }; case 'mcp_budget_warning': // Non-terminal: budget pressure is a status signal, not a stream // close. Count + capture latest so adapters can render @@ -1682,6 +1841,17 @@ function isClientEvictedData(value: unknown): value is DaemonClientEvictedData { ); } +function isStateResyncRequiredData( + value: unknown, +): value is DaemonStateResyncRequiredData { + return ( + isRecord(value) && + isNonEmptyString(value['reason']) && + isFiniteNumber(value['lastDeliveredId']) && + isFiniteNumber(value['earliestAvailableId']) + ); +} + function isSlowClientWarningData( value: unknown, ): value is DaemonSlowClientWarningData { diff --git a/packages/sdk-typescript/src/daemon/index.ts b/packages/sdk-typescript/src/daemon/index.ts index 51d2818978..bbfd6099a7 100644 --- a/packages/sdk-typescript/src/daemon/index.ts +++ b/packages/sdk-typescript/src/daemon/index.ts @@ -51,6 +51,10 @@ export type { DaemonClientEvictedData, DaemonClientEvictedEvent, DaemonControlEvent, + // #4175 F4 prereq (Ilya0527 issue #15) — daemon-emitted resync + // signal for SSE reconnects past the ring eviction boundary. + DaemonStateResyncRequiredData, + DaemonStateResyncRequiredEvent, DaemonMcpServerRestartedData, DaemonMcpServerRestartedEvent, DaemonMcpServerRestartRefusedData, diff --git a/packages/sdk-typescript/src/index.ts b/packages/sdk-typescript/src/index.ts index 58f32f086f..40049a0c90 100644 --- a/packages/sdk-typescript/src/index.ts +++ b/packages/sdk-typescript/src/index.ts @@ -44,6 +44,10 @@ export { type DaemonErrorKind, type DaemonClientEvictedData, type DaemonClientEvictedEvent, + // #4175 F4 prereq (Ilya0527 issue #15) — daemon-emitted resync + // signal for SSE reconnects past the ring eviction boundary. + type DaemonStateResyncRequiredData, + type DaemonStateResyncRequiredEvent, type DaemonClientOptions, type DaemonContentHash, type DaemonControlEvent, diff --git a/packages/sdk-typescript/test/unit/daemonEvents.test.ts b/packages/sdk-typescript/test/unit/daemonEvents.test.ts index fce4bc668f..c04d4e16cd 100644 --- a/packages/sdk-typescript/test/unit/daemonEvents.test.ts +++ b/packages/sdk-typescript/test/unit/daemonEvents.test.ts @@ -2192,4 +2192,204 @@ describe('PR 21 — auth device-flow events', () => { expect(state.lastEventId).toBe(99); }); }); + + describe('state_resync_required (#4175 F4 prereq, Ilya0527 issue #15)', () => { + it('sets awaitingResync + records the resync data when daemon emits state_resync_required', () => { + const state = reduceDaemonSessionEvent(createDaemonSessionViewState(), { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }, + }); + expect(state.awaitingResync).toBe(true); + expect(state.resyncRequiredCount).toBe(1); + expect(state.lastResyncRequired).toEqual({ + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }); + }); + + it('auto-skips delta events (session_update) while awaitingResync is true', () => { + // Step 1: daemon emits resync → flag set. + const afterResync = reduceDaemonSessionEvent( + createDaemonSessionViewState(), + { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }, + }, + ); + // Step 2: subsequent session_update would normally set + // `lastSessionUpdate` — but awaitingResync auto-skips it. + const skipped = reduceDaemonSessionEvent(afterResync, { + id: 13, + v: 1, + type: 'session_update', + data: { sessionId: 's-X', phase: 'prompting' }, + }); + // lastSessionUpdate unchanged (skipped), lastEventId DID advance. + expect(skipped.lastSessionUpdate).toBeUndefined(); + expect(skipped.lastEventId).toBe(13); + expect(skipped.awaitingResync).toBe(true); + }); + + it('auto-skips permission_request while awaitingResync (no pendingPermissions mutation)', () => { + const afterResync = reduceDaemonSessionEvent( + createDaemonSessionViewState(), + { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }, + }, + ); + const skipped = reduceDaemonSessionEvent(afterResync, { + id: 13, + v: 1, + type: 'permission_request', + data: { + requestId: 'req-stale', + sessionId: 's-1', + toolCall: { + toolCallId: 'tc-1', + status: 'pending', + title: 'Read /etc/passwd', + }, + options: [ + { + optionId: 'allow_once', + name: 'Allow once', + kind: 'allow_once', + }, + ], + }, + }); + // pendingPermissions stays empty — the permission_request was + // applied to stale state and we can't trust which permissions + // are still pending until loadSession recovery. + expect(skipped.pendingPermissions).toEqual({}); + }); + + it('still applies terminal events (session_died) while awaitingResync', () => { + // Critical: a session that DIES while in resync limbo must still + // be observable as dead. Otherwise UIs would render "loading + // resync state…" indefinitely while the underlying session is + // gone. + const afterResync = reduceDaemonSessionEvent( + createDaemonSessionViewState(), + { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }, + }, + ); + const dead = reduceDaemonSessionEvent(afterResync, { + id: 14, + v: 1, + type: 'session_died', + data: { sessionId: 's-1', reason: 'channel_closed' }, + }); + expect(dead.alive).toBe(false); + expect(dead.terminalEvent?.type).toBe('session_died'); + // awaitingResync stays set (the consumer never recovered from + // resync — the session just died first). The terminal event + // takes precedence for UI rendering. + expect(dead.awaitingResync).toBe(true); + }); + + it('still applies stream_error while awaitingResync', () => { + const afterResync = reduceDaemonSessionEvent( + createDaemonSessionViewState(), + { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }, + }, + ); + const errored = reduceDaemonSessionEvent(afterResync, { + v: 1, + type: 'stream_error', + data: { error: 'transport gone' }, + }); + expect(errored.alive).toBe(false); + expect(errored.streamError).toEqual({ error: 'transport gone' }); + }); + + it('reseeding view state via createDaemonSessionViewState clears awaitingResync (consumer recovery)', () => { + // Consumer recovery path: after observing awaitingResync, call + // loadSession (out of band) and reconstruct view state. The + // fresh state has the flag back to false. + const stale = reduceDaemonSessionEvent(createDaemonSessionViewState(), { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }, + }); + expect(stale.awaitingResync).toBe(true); + // Consumer calls loadSession + builds fresh state from result. + const recovered = createDaemonSessionViewState({ + sessionId: 's-1', + lastEventId: 20, + }); + expect(recovered.awaitingResync).toBe(false); + expect(recovered.resyncRequiredCount).toBe(0); + }); + + it('a second state_resync_required increments resyncRequiredCount', () => { + // Repeated reconnect past the ring boundary — counter accumulates. + let state = createDaemonSessionViewState(); + state = reduceDaemonSessionEvent(state, { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }, + }); + state = reduceDaemonSessionEvent(state, { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 20, + earliestAvailableId: 100, + }, + }); + expect(state.resyncRequiredCount).toBe(2); + expect(state.lastResyncRequired?.lastDeliveredId).toBe(20); + }); + + it('rejects malformed state_resync_required payload via unrecognizedKnownEventCount', () => { + const state = reduceDaemonSessionEvent(createDaemonSessionViewState(), { + v: 1, + type: 'state_resync_required', + data: { reason: 'ring_evicted' }, // missing lastDeliveredId/earliestAvailableId + }); + expect(state.unrecognizedKnownEventCount).toBe(1); + expect(state.awaitingResync).toBe(false); + }); + }); }); From 74412919ce8957dc1e9f048e13fe95d0a85c93b9 Mon Sep 17 00:00:00 2001 From: doudouOUC Date: Thu, 21 May 2026 01:01:17 +0800 Subject: [PATCH 3/6] fix(acp-bridge): preserve FsError structure over ACP wire (#4360 Codex round 2 fold-in) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adopts Codex review round 2 P2 finding on PR #4360 — fold-in to the F4 prereq scope per user's "a" decision. **Problem**: When the `BridgeFileSystem` adapter (introduced in #4334 fs adapter wiring) throws a structured `FsError` (e.g. `kind: 'untrusted_workspace'` / `kind: 'symlink_escape'` / `kind: 'file_too_large'`), the `@agentclientprotocol/sdk` default RPC error serialization only sends `error.message` as JSON-RPC -32603 "Internal error". The structured `kind` / `status` / `hint` fields on FsError are stripped on the way to the agent. Downstream impact: SDK consumers receiving the ACP error payload lose the typed discriminator and have to regex-match the human- readable message to dispatch UI (auth retry vs file picker vs proxy hint). This silently regresses what the FsError-typed contract was supposed to provide. **Fix**: At the bridge boundary (`BridgeClient.writeTextFile` and `BridgeClient.readTextFile`), catch errors from `this.fileSystem. writeText/readText` calls. Duck-type check for FsError shape (`err.name === 'FsError'` + `typeof err.kind === 'string'`); when matched, rethrow as ACP `RequestError(-32603, message, {errorKind, hint, status})`. The agent's RPC client now receives `data. errorKind` and can branch on the closed-enum kind. Cross-package note: FsError lives in `cli/src/serve/fs/errors.ts` and acp-bridge can't `import { FsError }` from cli (dependency inversion). Same duck-typing pattern that `mapDomainErrorToErrorKind` (status.ts) already applies to `TrustGateError` / `SkillError` for the same cross-package bundling reason — `instanceof` would fail across package boundaries when bundlers don't dedupe. **Code shape** ```typescript function isFsErrorShape(err: unknown): err is FsErrorShape { return ( err instanceof Error && err.name === 'FsError' && typeof (err as { kind?: unknown }).kind === 'string' ); } function preserveFsErrorOverAcp(err: unknown): never { if (isFsErrorShape(err)) { throw new RequestError(-32603, err.message, { errorKind: err.kind, ...(err.hint !== undefined ? { hint: err.hint } : {}), ...(err.status !== undefined ? { status: err.status } : {}), }); } throw err; } ``` Applied at both `if (this.fileSystem) { ... }` blocks (writeTextFile + readTextFile) — wrapped the adapter call in try/catch + `preserveFsErrorOverAcp(err)`. Non-FsError errors are rethrown unchanged (default ACP serialization is fine for unstructured errors; only the structured shape needs preservation). JSON-RPC code stays at -32603 (internal error) rather than mapping FsError.kind → JSON-RPC code. Rationale: the JSON-RPC standard defines only a handful of code values (-32700/-32600/-32601/-32602/ -32603 + a reserved range for application errors), and mapping ~10 FsError kinds to that narrow space is lossy. Instead the structured `data.errorKind` carries the semantic information SDK consumers need; JSON-RPC code remains the generic "an error happened" signal. **Tests** (+5 in `bridgeClient.test.ts`) - writeTextFile FsError → ACP RequestError with errorKind in data - readTextFile FsError preserving symlink_escape kind (no hint field present → not stamped, spread guard works) - non-FsError pass-through (plain Error stays plain Error, no RequestError wrap) - hint field preservation when present - defensive: error with `kind` field but wrong `name` does NOT get wrapped (e.g. PermissionForbiddenError happens to have a kind field internally — must NOT be confused for FsError) Verification: 113/113 acp-bridge tests pass (+5 new FsError- preservation tests). Full serve suite shows pre-existing F3-related failures unrelated to this change (verified in isolation). --- packages/acp-bridge/src/bridgeClient.test.ts | 165 +++++++++++++++++++ packages/acp-bridge/src/bridgeClient.ts | 82 ++++++++- 2 files changed, 245 insertions(+), 2 deletions(-) diff --git a/packages/acp-bridge/src/bridgeClient.test.ts b/packages/acp-bridge/src/bridgeClient.test.ts index e139d95067..4c76fcf17f 100644 --- a/packages/acp-bridge/src/bridgeClient.test.ts +++ b/packages/acp-bridge/src/bridgeClient.test.ts @@ -160,6 +160,171 @@ describe('BridgeClient — BridgeFileSystem injection seam (F1 step 5)', () => { }); }); + describe('FsError preservation over ACP wire (#4175 F4 prereq, Codex #4360 round 2)', () => { + // The fix scope: when `BridgeFileSystem.writeText` / + // `BridgeFileSystem.readText` throw a structured `FsError`, the + // BridgeClient must rethrow as ACP `RequestError` with `data. + // errorKind` / `data.hint` / `data.status` preserved. Pre-fix + // the ACP SDK serialized only `error.message` so SDK consumers + // lost the discriminator and had to regex-match the message. + // + // FsError lives in `cli/src/serve/fs/errors.ts` — acp-bridge can't + // import it (cross-package dep inversion), so we synthesize the + // shape directly here. The duck typing in + // `preserveFsErrorOverAcp` keys on `err.name === 'FsError'` + + // `typeof err.kind === 'string'`. + + function makeFsError( + kind: string, + message: string, + extras: { hint?: string; status?: number } = {}, + ): Error { + const err = new Error(message); + err.name = 'FsError'; + (err as unknown as { kind: string }).kind = kind; + if (extras.hint !== undefined) { + (err as unknown as { hint: string }).hint = extras.hint; + } + if (extras.status !== undefined) { + (err as unknown as { status: number }).status = extras.status; + } + return err; + } + + it('writeTextFile rethrows FsError as ACP RequestError with errorKind in data', async () => { + const writeText = vi.fn(async (): Promise => { + throw makeFsError( + 'untrusted_workspace', + 'workspace is not trusted; write operations are forbidden', + { + status: 403, + hint: 'enable trust via createWorkspaceFileSystemFactory', + }, + ); + }); + const client = makeClient({ writeText, readText: vi.fn() }); + + const err = (await client + .writeTextFile({ + path: '/x', + content: 'y', + sessionId: 'sess:test', + }) + .catch((e) => e)) as Error & { code?: number; data?: unknown }; + + // Reshaped as JSON-RPC RequestError (-32603 = internal error) + // with structured data field. + expect(err.name).toBe('RequestError'); + expect(err.code).toBe(-32603); + expect(err.message).toContain('not trusted'); + expect(err.data).toMatchObject({ + errorKind: 'untrusted_workspace', + status: 403, + hint: expect.any(String), + }); + }); + + it('readTextFile rethrows FsError preserving symlink_escape kind', async () => { + const readText = vi.fn(async (): Promise => { + throw makeFsError( + 'symlink_escape', + 'symlink resolves outside workspace', + { status: 400 }, + ); + }); + const client = makeClient({ writeText: vi.fn(), readText }); + + const err = (await client + .readTextFile({ path: '/x', sessionId: 'sess:test' }) + .catch((e) => e)) as Error & { code?: number; data?: unknown }; + + expect(err.name).toBe('RequestError'); + expect(err.code).toBe(-32603); + expect(err.data).toMatchObject({ + errorKind: 'symlink_escape', + status: 400, + }); + // No `hint` field on this FsError → not stamped (spread guard). + expect((err.data as { hint?: unknown }).hint).toBeUndefined(); + }); + + it('passes non-FsError errors through unchanged (no RequestError wrap)', async () => { + // Plain Error → bridgeClient must NOT wrap it. Only structured + // FsError gets the reshape. ACP's default serialization is + // adequate for unstructured errors. + const writeText = vi.fn(async (): Promise => { + throw new Error('boring generic failure'); + }); + const client = makeClient({ writeText, readText: vi.fn() }); + + const err = (await client + .writeTextFile({ + path: '/x', + content: 'y', + sessionId: 'sess:test', + }) + .catch((e) => e)) as Error & { code?: number; data?: unknown }; + + // Original Error preserved — no JSON-RPC code stamped. + expect(err.name).toBe('Error'); + expect(err.message).toBe('boring generic failure'); + expect(err.code).toBeUndefined(); + expect(err.data).toBeUndefined(); + }); + + it('preserves hint field when present on the FsError', async () => { + const writeText = vi.fn(async (): Promise => { + throw makeFsError( + 'file_too_large', + 'file of 6 MiB exceeds write cap of 5 MiB', + { hint: 'split large writes into bounded chunks', status: 413 }, + ); + }); + const client = makeClient({ writeText, readText: vi.fn() }); + + const err = (await client + .writeTextFile({ + path: '/x', + content: 'y', + sessionId: 'sess:test', + }) + .catch((e) => e)) as Error & { code?: number; data?: unknown }; + + expect((err.data as { hint?: string }).hint).toBe( + 'split large writes into bounded chunks', + ); + expect((err.data as { errorKind?: string }).errorKind).toBe( + 'file_too_large', + ); + }); + + it('does not wrap an error that LOOKS like FsError but has wrong name', async () => { + // Defensive: an unrelated error class with a `kind` field but + // a different `name` should fall through to the unstructured + // path. Prevents accidental wrapping of e.g. permission errors + // that happen to carry a `kind` discriminator. + const writeText = vi.fn(async (): Promise => { + const err = new Error('looks-similar'); + err.name = 'PermissionForbiddenError'; + (err as unknown as { kind: string }).kind = + 'designated_originator_mismatch'; + throw err; + }); + const client = makeClient({ writeText, readText: vi.fn() }); + + const err = (await client + .writeTextFile({ + path: '/x', + content: 'y', + sessionId: 'sess:test', + }) + .catch((e) => e)) as Error & { code?: number }; + + expect(err.name).toBe('PermissionForbiddenError'); + expect(err.code).toBeUndefined(); + }); + }); + describe('inline fallback when fileSystem is omitted (regression guard)', () => { let tmpDir: string; beforeEach(async () => { diff --git a/packages/acp-bridge/src/bridgeClient.ts b/packages/acp-bridge/src/bridgeClient.ts index 4b8b911afb..c0792fe211 100644 --- a/packages/acp-bridge/src/bridgeClient.ts +++ b/packages/acp-bridge/src/bridgeClient.ts @@ -17,6 +17,7 @@ import type { WriteTextFileRequest, WriteTextFileResponse, } from '@agentclientprotocol/sdk'; +import { RequestError } from '@agentclientprotocol/sdk'; import type { BridgeEvent, EventBus } from './eventBus.js'; import type { BridgeFileSystem } from './bridgeFileSystem.js'; import { CANCEL_VOTE_SENTINEL } from './permissionMediator.js'; @@ -35,6 +36,67 @@ import type { import { CancelSentinelCollisionError } from './bridgeErrors.js'; import { writeStderrLine } from './internal/stderrLine.js'; +/** + * Duck-type check for `FsError` from `cli/src/serve/fs/errors.ts` + * (#4175 F4 prereq — Codex review on #4360 round 2). FsError lives + * in the `cli` package, but this class lives in `acp-bridge` — a + * direct `import { FsError }` would invert the dependency. We use + * the same `.name`-based duck typing that `mapDomainErrorToErrorKind` + * (status.ts) already applies to `TrustGateError` / `SkillError` + * for the same cross-package bundling reason. + * + * Without this preservation: when the `BridgeFileSystem` adapter + * throws an `FsError` (e.g. `kind: 'untrusted_workspace'`, `kind: + * 'symlink_escape'`, `kind: 'file_too_large'`), the ACP SDK's + * default RPC error path serializes only `error.message` as + * "Internal error" — the structured `kind` / `status` / `hint` are + * lost on the wire. SDK consumers downstream can no longer dispatch + * typed UI (auth retry vs file picker vs proxy hint) without + * regex-matching the human-readable message. + * + * With this preservation: the bridge boundary catches FsError, + * rethrows as ACP `RequestError(-32603, message, {errorKind, hint, + * status})`. The agent's RPC client receives `data.errorKind` and + * can branch on the closed-enum kind. JSON-RPC code stays at + * internal-error (-32603) since the bridge can't reliably map + * FsError.kind to a JSON-RPC error code shape — the structured + * `data` field is what carries semantic information for SDK + * consumers. + */ +interface FsErrorShape { + name: 'FsError'; + message: string; + kind: string; + status?: number; + hint?: string; +} + +function isFsErrorShape(err: unknown): err is FsErrorShape { + return ( + err instanceof Error && + err.name === 'FsError' && + typeof (err as { kind?: unknown }).kind === 'string' + ); +} + +/** + * Rethrow an FsError as a structured ACP `RequestError` so the + * agent's RPC client sees `data.errorKind` / `data.hint` / + * `data.status` rather than just the human-readable message. + * Non-FsError errors are rethrown unchanged — the default ACP + * serialization is fine for unstructured errors. + */ +function preserveFsErrorOverAcp(err: unknown): never { + if (isFsErrorShape(err)) { + throw new RequestError(-32603, err.message, { + errorKind: err.kind, + ...(err.hint !== undefined ? { hint: err.hint } : {}), + ...(err.status !== undefined ? { status: err.status } : {}), + }); + } + throw err; +} + /** * #4175 F3 Commit 3 — translate the mediator's internal * `PermissionResolution` to the ACP-shaped `RequestPermissionResponse` @@ -647,7 +709,17 @@ export class BridgeClient implements Client { // injection and fall through to the inline path so pre-F1 behavior // is preserved verbatim where no adapter has been wired. if (this.fileSystem) { - return this.fileSystem.writeText(params); + // #4175 F4 prereq — preserve FsError structure over ACP wire + // (Codex review on #4360 round 2). Without this catch, an + // `FsError({kind:'untrusted_workspace'})` from the adapter + // would land at the agent as `{code:-32603, message:...}` with + // the kind/status/hint stripped. See `preserveFsErrorOverAcp` + // for the cross-package duck-typing rationale. + try { + return await this.fileSystem.writeText(params); + } catch (err) { + preserveFsErrorOverAcp(err); + } } // Stage 1 known divergence: this raw `fs.writeFile` reimplements file // I/O instead of delegating to core's filesystem service. The @@ -795,7 +867,13 @@ export class BridgeClient implements Client { // Mode A + channels + IDE companion fall through to the inline // proxy below. if (this.fileSystem) { - return this.fileSystem.readText(params); + // #4175 F4 prereq — preserve FsError structure over ACP wire. + // See sibling block in `writeTextFile` for rationale. + try { + return await this.fileSystem.readText(params); + } catch (err) { + preserveFsErrorOverAcp(err); + } } // Reject obviously-degenerate `limit` up front. Without this, // `sliceLineRange` hits the `end < start` path and returns an From 9e085e2fb07dde542fea364023851aa82e97d4de Mon Sep 17 00:00:00 2001 From: doudouOUC Date: Thu, 21 May 2026 01:41:26 +0800 Subject: [PATCH 4/6] fix: 7 wenshao/copilot review fold-ins on #4360 (1 Critical + 6 Suggestion) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adopts all 7 review threads from the first wenshao + Copilot review round on PR #4360. All technical fixes (no judgment calls). **[Critical] BridgeTimeoutError constructor blocks tsc** (wenshao PRRT_kwDOPB-92c6DfcRI) `server.test.ts:4670` called `new BridgeTimeoutError('initialize timed out')` but the constructor signature is `(label: string, timeoutMs: number)` — TS2554 blocked `tsc --noEmit` and `npm run build`. Fixed to `new BridgeTimeoutError('initialize', 5000)` per suggested fix; resulting message `"HttpAcpBridge initialize timed out after 5000ms"` still satisfies the existing `.toContain('timed out')` assertion. **[Suggestion] Copilot JSDoc package name** (Copilot PRRT_kwDOPB-92c6De-Sm, ToolCallEmitter.ts:210) JSDoc referenced `@qwen-code/core/mcp-tool` but the actual package is `@qwen-code/qwen-code-core` with the file at `packages/core/src/tools/mcp-tool.ts`. Updated the reference. **[Suggestion] Copilot errorKind type widening** (Copilot PRRT_kwDOPB-92c6De-Ro, events.ts:244) `DaemonStreamErrorData.errorKind` was typed as `string` and the JSDoc said "7-value" closed enum — but `DAEMON_ERROR_KINDS` actually has 8 values, and `SERVE_ERROR_KINDS` (daemon-side) has 9 (adds `stat_failed`). Typed as `DaemonErrorKind | (string & {})` for forward-compat: SDK consumers get IDE autocomplete on the known 8 kinds while still accepting future daemon-side additions (like `stat_failed`) without a type error. Updated JSDoc to accurately list 8 current values + call out the forward-compat widening. Side observation (NOT in scope of this PR): `DAEMON_ERROR_KINDS` (SDK) lacks `stat_failed` that exists in `SERVE_ERROR_KINDS` (daemon). That's a separate drift fix. **[Suggestion] TERMINAL wording misleading** (wenshao PRRT_kwDOPB-92c6Dj-JL, eventBus.ts:369) Comment called `state_resync_required` a "TERMINAL synthetic frame" but it's emitted FIRST (before replay) and the stream stays OPEN. Genuine terminals like `client_evicted` close the stream after the frame. Rewrote the comment per suggestion: "id-less synthetic frame... Unlike `client_evicted`, the stream stays OPEN" — so an oncall reading the source at 3am gets the right mental model. **[Suggestion] `_meta` merge dead code + stale reference** (wenshao PRRT_kwDOPB-92c6Dj-JF, server.ts:2569) The `existingMeta` merge reads `event._meta` at BridgeEvent top level, but ToolCallEmitter's `_meta` lives nested inside `event.data._meta` (publish path goes through `events.publish({type: 'session_update', data: params})`). In production `existingMeta` is always undefined — the merge is a forward-compat escape hatch, not an active merge. Also the comment referenced `extractServerTimestamp` (sdk-typescript) which grep confirms doesn't exist yet (it's planned in chiga0 PR #4353). Rewrote the comment block to (1) acknowledge no current producer sets `_meta` at the top level — it's a forward-compat hook for future envelope-level metadata; (2) drop the stale `extractServerTimestamp` reference and instead note that chiga0 PR #4353 plans the 3-location probe. Code shape unchanged (forward-compat spread stays). **[Suggestion] session_closed + client_evicted passthrough tests** (wenshao PRRT_kwDOPB-92c6Dj-JW, daemonEvents.test.ts:2284) `RESYNC_PASSTHROUGH_TYPES` has 5 members but only `session_died` and `stream_error` had passthrough tests. Added two missing tests: `session_closed` and `client_evicted` while awaitingResync. Critical because if a future refactor accidentally drops either from the set, a consumer in resync limbo would silently swallow the terminal signal and the UI would hang on "loading resync state…". **[Suggestion] readTextFile non-FsError passthrough test** (wenshao PRRT_kwDOPB-92c6Dj-JX, bridgeClient.test.ts:251) The non-FsError pass-through test only covered `writeTextFile`. Added a symmetric `readTextFile` test — the two `try/catch` blocks in `bridgeClient.ts` are independent, so test parity guards against divergent refactors (e.g. someone adding wrapping on one side but not the other). Verification - `packages/acp-bridge`: 6 files, 114/114 pass (+1 new readTextFile non-FsError test). - `packages/sdk-typescript`: 75/75 pass on daemonEvents.test.ts (+2 new session_closed / client_evicted passthrough tests). - `packages/cli/src/serve/server.test.ts`: 248 tests pass on touched cases (5 SSE / serverTimestamp / stream_error tests). Pre-existing F3 (#4335 merge) test failures unrelated to this PR's changes — verified by stash-test-restore on clean tree. - TypeScript clean on touched regions; `BridgeTimeoutError` 2-arg fix unblocks `tsc --noEmit` for the test file. --- packages/acp-bridge/src/bridgeClient.test.ts | 21 +++++++ packages/acp-bridge/src/eventBus.ts | 25 ++++---- .../session/emitters/ToolCallEmitter.ts | 8 ++- packages/cli/src/serve/server.test.ts | 6 +- packages/cli/src/serve/server.ts | 29 ++++++--- packages/sdk-typescript/src/daemon/events.ts | 26 +++++--- .../test/unit/daemonEvents.test.ts | 60 +++++++++++++++++++ 7 files changed, 145 insertions(+), 30 deletions(-) diff --git a/packages/acp-bridge/src/bridgeClient.test.ts b/packages/acp-bridge/src/bridgeClient.test.ts index 4c76fcf17f..57978a9bc0 100644 --- a/packages/acp-bridge/src/bridgeClient.test.ts +++ b/packages/acp-bridge/src/bridgeClient.test.ts @@ -272,6 +272,27 @@ describe('BridgeClient — BridgeFileSystem injection seam (F1 step 5)', () => { expect(err.data).toBeUndefined(); }); + it('readTextFile passes non-FsError errors through unchanged (wenshao #4360 review)', async () => { + // Symmetric guard for the read-side `preserveFsErrorOverAcp` + // call. The write- and read-side catch blocks are independent + // try/catch wrappers in `bridgeClient.ts`; if a future refactor + // diverges them (e.g. adds Error-wrapping to one but not the + // other), this test catches the read-side regression. + const readText = vi.fn(async (): Promise => { + throw new Error('generic read failure'); + }); + const client = makeClient({ writeText: vi.fn(), readText }); + + const err = (await client + .readTextFile({ path: '/x', sessionId: 'sess:test' }) + .catch((e) => e)) as Error & { code?: number; data?: unknown }; + + expect(err.name).toBe('Error'); + expect(err.message).toBe('generic read failure'); + expect(err.code).toBeUndefined(); + expect(err.data).toBeUndefined(); + }); + it('preserves hint field when present on the FsError', async () => { const writeText = vi.fn(async (): Promise => { throw makeFsError( diff --git a/packages/acp-bridge/src/eventBus.ts b/packages/acp-bridge/src/eventBus.ts index ce6b024d49..b55df438df 100644 --- a/packages/acp-bridge/src/eventBus.ts +++ b/packages/acp-bridge/src/eventBus.ts @@ -366,21 +366,26 @@ export class EventBus { // caught up!") even though the SDK reducer's state was now // diverged from the daemon's truth. // - // Emit `state_resync_required` as a TERMINAL synthetic frame - // (no `id` — same pattern as `client_evicted` so it doesn't - // burn a slot in the per-session monotonic sequence other - // subscribers observe). The SDK reducer treats this as "your - // state is stale; call loadSession before applying any further - // deltas" — see `awaitingResync` flag in the SDK reducer. + // Emit `state_resync_required` as an id-less synthetic frame + // (no `id` — same no-burn pattern as `client_evicted`, so it + // doesn't occupy a slot in the per-session monotonic sequence + // other subscribers observe). **Unlike `client_evicted`, the + // stream stays OPEN after this frame** — the resync frame is + // emitted FIRST (before replay), and replay + live frames + // continue flowing afterward. The SDK reducer treats this as + // "your state is stale; call loadSession before applying any + // further deltas" — see `awaitingResync` flag in the SDK + // reducer. Wenshao review (#4360) corrected the prior wording + // that called this "TERMINAL" — that's misleading for oncall; + // `client_evicted` is genuinely terminal (closes stream), + // `state_resync_required` is recovery-oriented (keeps stream + // open). // // Replay continues after the resync frame (per design): the // SDK reducer will auto-skip delta application until // loadSession clears the flag, but the frames stay on the // wire so SDK has the option to compute a "what you missed" - // diff later. This is network-friendly (no extra reconnect) - // and matches the existing `client_evicted` semantics - // (terminal-on-close, but consumer can still drain queued - // frames first). + // diff later. This is network-friendly (no extra reconnect). const earliestInRing = this.ring[0]?.id; if ( earliestInRing !== undefined && diff --git a/packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.ts b/packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.ts index 6f3c042c5a..f19786da73 100644 --- a/packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.ts +++ b/packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.ts @@ -205,9 +205,11 @@ export class ToolCallEmitter extends BaseEmitter { * - `subagentMeta` present → `'subagent'` (a Task tool / Codex * subagent / etc. wrapping its own tool calls) * - toolName matches `mcp____` → `'mcp'` with - * `serverId: `. Naming convention from `@qwen-code/core/ - * mcp-tool` — mirrors the SDK's same heuristic fallback so SDK - * consumers stay consistent with daemon classification. + * `serverId: `. Naming convention from + * `packages/core/src/tools/mcp-tool.ts` in the + * `@qwen-code/qwen-code-core` package — mirrors the SDK's same + * heuristic fallback so SDK consumers stay consistent with + * daemon classification. * - everything else → `'builtin'` * * Static + pure so it can be unit-tested without an emitter diff --git a/packages/cli/src/serve/server.test.ts b/packages/cli/src/serve/server.test.ts index 636719d5c1..bbac0c96a3 100644 --- a/packages/cli/src/serve/server.test.ts +++ b/packages/cli/src/serve/server.test.ts @@ -4667,7 +4667,11 @@ describe('GET /session/:id/events (SSE)', () => { const bridge = fakeBridge({ async *subscribeImpl(_sessionId, _opts) { yield { id: 1, v: 1, type: 'session_update', data: 'first' }; - throw new BridgeTimeoutError('initialize timed out'); + // `BridgeTimeoutError(label, timeoutMs)` — 2 positional args + // (wenshao #4360 review). The resulting message is + // `"HttpAcpBridge initialize timed out after 5000ms"` which + // satisfies the `.toContain('timed out')` assertion below. + throw new BridgeTimeoutError('initialize', 5000); }, }); handle = await runQwenServe( diff --git a/packages/cli/src/serve/server.ts b/packages/cli/src/serve/server.ts index 808d2d9729..cd8476420d 100644 --- a/packages/cli/src/serve/server.ts +++ b/packages/cli/src/serve/server.ts @@ -2559,13 +2559,28 @@ function formatSseFrame(event: BridgeEvent | OmitId): string { // ordering and "X minutes ago" UIs use the server's clock rather than // each client's drifting local clock. Stamped at the wire boundary // (NOT at EventBus.publish) so the in-memory `BridgeEvent` type stays - // unchanged and internal consumers don't see `_meta`. Pre-existing - // `_meta` keys on the event (e.g. tool_call carries `_meta.toolName` / - // `_meta.timestamp` from ToolCallEmitter) are preserved by spreading - // first. SDK consumers read via the 3-location probe in - // `extractServerTimestamp` (sdk-typescript) — `_meta.serverTimestamp` - // is one of the supported locations; the other two stay free for - // future use. + // unchanged and internal consumers don't see `_meta`. + // + // **Where `_meta` lives**: this code merges with `event._meta` at the + // BridgeEvent top level. **No current producer in the daemon sets + // `_meta` at the top level** — wenshao #4360 review noted that + // ToolCallEmitter's `_meta` lives nested at `event.data._meta` (the + // ACP `session/update` payload's own metadata), and `bridgeClient.ts` + // publishes via `events.publish({ type: 'session_update', data: + // params })` so the emitter's `_meta` stays inside `data`. The + // top-level merge here is a **forward-compat escape hatch** — if a + // future emitter ever wants to stamp envelope-level metadata, it can + // do so via `_meta` and this spread preserves it. Today + // `existingMeta` is always `undefined` in production and the spread + // is a no-op merge with `{}`. + // + // SDK consumer plan: a 3-location probe (event.serverTimestamp / + // event._meta.serverTimestamp / event.data._meta.serverTimestamp) + // is planned in chiga0's separate PR #4353 (not yet merged into + // `daemon_mode_b_main`). When that ships, SDK readers find this + // field at the `_meta.serverTimestamp` location. Until then this + // stamp is daemon-side only; SDK consumers can read it through an + // `as any` cast or wait for #4353. const existingMeta = (event as { _meta?: Record })._meta; const stamped = { ...event, diff --git a/packages/sdk-typescript/src/daemon/events.ts b/packages/sdk-typescript/src/daemon/events.ts index e37e4065b1..409bbd639e 100644 --- a/packages/sdk-typescript/src/daemon/events.ts +++ b/packages/sdk-typescript/src/daemon/events.ts @@ -6,6 +6,7 @@ import type { DaemonEvent, + DaemonErrorKind, DaemonMcpTransport, PermissionOutcome, } from './types.js'; @@ -232,16 +233,23 @@ export interface DaemonStreamErrorData { error: string; /** * #4175 F4 prereq (chiga0 issue #19 P0). Classified error kind from - * the daemon's `mapDomainErrorToErrorKind` — one of the 7-value - * closed enum (`init_timeout` / `protocol_error` / `missing_binary` - * / `auth_env_error` / `blocked_egress` / `missing_file` / - * `parse_error`). Absent for unclassified errors — the daemon omits - * the field rather than stamping a meaningless value. UI consumers - * key on this for typed retry / remediation rendering (retry on - * init_timeout vs install on missing_binary, etc.) instead of - * regex-matching the `error` string. + * the daemon's `mapDomainErrorToErrorKind` — typed as the closed + * `DaemonErrorKind` enum (currently 8 values: `missing_binary` / + * `blocked_egress` / `auth_env_error` / `init_timeout` / + * `protocol_error` / `missing_file` / `parse_error` / + * `budget_exhausted`) with a `(string & {})` widening for forward- + * compat. A future daemon may emit additional kinds (e.g. + * `stat_failed` already exists on the daemon's `SERVE_ERROR_KINDS` + * but is not yet mirrored on this SDK constant) — the union shape + * preserves IDE autocomplete on the known values while accepting + * forward-compat strings without a type error. Absent for + * unclassified errors — the daemon omits the field rather than + * stamping a meaningless value. UI consumers key on this for typed + * retry / remediation rendering (retry on init_timeout vs install + * on missing_binary, etc.) instead of regex-matching the `error` + * string. */ - errorKind?: string; + errorKind?: DaemonErrorKind | (string & {}); [key: string]: unknown; } diff --git a/packages/sdk-typescript/test/unit/daemonEvents.test.ts b/packages/sdk-typescript/test/unit/daemonEvents.test.ts index c04d4e16cd..3db1e17de4 100644 --- a/packages/sdk-typescript/test/unit/daemonEvents.test.ts +++ b/packages/sdk-typescript/test/unit/daemonEvents.test.ts @@ -2334,6 +2334,66 @@ describe('PR 21 — auth device-flow events', () => { expect(errored.streamError).toEqual({ error: 'transport gone' }); }); + it('still applies session_closed while awaitingResync (wenshao #4360 review)', () => { + // session_closed is in RESYNC_PASSTHROUGH_TYPES alongside + // session_died — terminal session lifecycle signals must still + // surface even when the consumer is in resync limbo. Otherwise + // a session that closes during resync would silently keep + // `alive: true` in view state and the UI would render "loading + // resync state…" indefinitely. + const afterResync = reduceDaemonSessionEvent( + createDaemonSessionViewState(), + { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }, + }, + ); + const closed = reduceDaemonSessionEvent(afterResync, { + id: 15, + v: 1, + type: 'session_closed', + data: { sessionId: 's-1', reason: 'client_initiated' }, + }); + expect(closed.alive).toBe(false); + expect(closed.terminalEvent?.type).toBe('session_closed'); + // awaitingResync stays set (consumer never recovered) — the + // terminal event takes precedence for UI rendering but the + // resync flag remains as observability state. + expect(closed.awaitingResync).toBe(true); + }); + + it('still applies client_evicted while awaitingResync (wenshao #4360 review)', () => { + // client_evicted is the 5th member of RESYNC_PASSTHROUGH_TYPES. + // It happens when the subscriber's queue overflows (the daemon + // closes the stream after force-pushing the synthetic frame). + // Even in resync limbo, the SDK must see the eviction so the + // adapter can stop pretending the stream is alive. + const afterResync = reduceDaemonSessionEvent( + createDaemonSessionViewState(), + { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }, + }, + ); + const evicted = reduceDaemonSessionEvent(afterResync, { + v: 1, + type: 'client_evicted', + data: { reason: 'queue_overflow', droppedAfter: 17 }, + }); + expect(evicted.alive).toBe(false); + expect(evicted.terminalEvent?.type).toBe('client_evicted'); + }); + it('reseeding view state via createDaemonSessionViewState clears awaitingResync (consumer recovery)', () => { // Consumer recovery path: after observing awaitingResync, call // loadSession (out of band) and reconstruct view state. The From dce2fed0f55cc4ac7326338adb5a8cc71449e829 Mon Sep 17 00:00:00 2001 From: doudouOUC Date: Thu, 21 May 2026 09:55:17 +0800 Subject: [PATCH 5/6] fix: 3 wenshao observability fold-ins on #4360 (all Suggestion) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adopts all 3 threads from wenshao's second review round on PR #4360. All Suggestion-level — daemon-side observability + 1 missing SDK reducer test. **[Suggestion] SSE ring eviction silently emits state_resync_required** (PRRT_kwDOPB-92c6Dp_Uk, eventBus.ts:394) Pre-fix: when a consumer reconnects past the ring boundary, the daemon emits `state_resync_required` with zero stderr breadcrumb. A 3am oncall chasing "my UI is frozen with stale state" couldn't grep daemon logs to distinguish (a) ring undersized, (b) client reconnecting too slowly, (c) network partition causing repeated reconnects. Fix: detect `next.value.type === 'state_resync_required'` in the SSE handler's iter loop in `server.ts` and emit a `writeStderrLine` with the gap details (`lastEventId`, `earliestInRing`, computed `gap` count, `reason`). Logged at the route boundary rather than inside `EventBus.subscribe` to keep the bus implementation pure + concentrate daemon-side observability in the route handler that already logs socket errors + heartbeats. **[Suggestion] Bridge iterator throw forwarded to client but not logged daemon-side** (PRRT_kwDOPB-92c6Dp_Uo, server.ts:1956) Pre-fix inconsistency: the adjacent `res.on('error', ...)` handler at line ~1925 logs SSE socket errors with `writeStderrLine`, but the bridge-iterator-catch block at line ~1940-1965 sends a `stream_error` SSE frame to the client AND swallows the error daemon-side. When the bridge iterator throws (subprocess crash, channel protocol error, unhandled rejection), distinguishing "subprocess OOM-killed" from "protocol bug" required attaching a debugger. Fix: mirror the adjacent handler's pattern — add `writeStderrLine` before the `stream_error` SSE frame send, including the classified `errorKind` (when available) in brackets so operators can grep for `[init_timeout]` / `[missing_binary]` etc. **[Suggestion] No SDK reducer test verifying stream_error.errorKind flowthrough** (PRRT_kwDOPB-92c6Dp_Uq, daemonEvents.test.ts:2331) The daemon-side wire format is tested in `server.test.ts` (`parsed.data.errorKind === 'init_timeout'`) and `DaemonStreamErrorData` now declares `errorKind?`, but the SDK reducer test suite never fed a `stream_error` event with `errorKind` and asserted `state.streamError?.errorKind`. A future refactor stripping `errorKind` from the reducer's data assignment (e.g. spreading only `{error}`) would silently regress without test signal. Fix: added `captures errorKind on stream_error in view state` test exercising the full pipeline — reducer receives stream_error with errorKind, view state's `streamError.errorKind` matches. Verification - `packages/sdk-typescript`: 76/76 daemonEvents tests pass (+1 new flowthrough test). - `packages/cli/src/serve/server.test.ts`: 6 targeted serverTimestamp / stream_error / errorKind tests pass — server.ts changes are observability-only (no behavior change to wire format). - Pre-existing F3 (#4335 merge) test failures elsewhere are unrelated to this PR's changes. --- packages/cli/src/serve/server.ts | 45 +++++++++++++++++++ .../test/unit/daemonEvents.test.ts | 26 +++++++++++ 2 files changed, 71 insertions(+) diff --git a/packages/cli/src/serve/server.ts b/packages/cli/src/serve/server.ts index cd8476420d..a55eae9c79 100644 --- a/packages/cli/src/serve/server.ts +++ b/packages/cli/src/serve/server.ts @@ -1934,6 +1934,37 @@ export function createServeApp( const next = await iter!.next(); if (next.done) break; if (res.writableEnded) break; + // SSE ring eviction observability (#4360 wenshao review): + // EventBus.subscribe emits `state_resync_required` when a + // consumer reconnects with `Last-Event-ID` past the ring + // boundary. Without a daemon-side log, an operator chasing + // "my UI is frozen with stale state" has no breadcrumb to + // tell whether the ring is undersized, the client reconnects + // too slowly, or a network partition caused repeated + // reconnects. Log here (at the SSE write boundary) rather + // than inside EventBus — keeps the bus implementation pure + // and concentrates daemon-side observability in the route + // handler that already logs socket errors / heartbeats. + if (next.value.type === 'state_resync_required') { + const data = next.value.data as { + lastDeliveredId?: number; + earliestAvailableId?: number; + reason?: string; + }; + const gap = + typeof data.earliestAvailableId === 'number' && + typeof data.lastDeliveredId === 'number' + ? data.earliestAvailableId - data.lastDeliveredId - 1 + : undefined; + writeStderrLine( + `qwen serve: SSE ring eviction detected (session ${sessionId}): ` + + `lastEventId=${data.lastDeliveredId ?? '?'}, ` + + `earliestInRing=${data.earliestAvailableId ?? '?'}, ` + + `gap=${gap ?? '?'} events, ` + + `reason=${data.reason ?? '?'}. ` + + `Consumer must call loadSession to recover.`, + ); + } await writeWithBackpressure(formatSseFrame(next.value)); } } catch (err) { @@ -1953,6 +1984,20 @@ export function createServeApp( // rendering `error` text as before, so adding `errorKind` is // strictly additive / backward-compatible. const errorKind = mapDomainErrorToErrorKind(err); + // Bridge iterator error observability (#4360 wenshao review): + // forwarding the error to the client via `stream_error` is + // good for UX but leaves the daemon side blind. The adjacent + // `res.on('error', ...)` handler at line ~1925 already logs + // socket-level errors with `writeStderrLine`; mirror that + // pattern here for protocol/subprocess errors so operators + // can grep daemon stderr for "bridge iterator error" instead + // of attaching a debugger to distinguish "subprocess + // OOM-killed" from "bridge protocol bug". + writeStderrLine( + `qwen serve: bridge iterator error (session ${sessionId}): ` + + `${errorMessage(err)}` + + (errorKind ? ` [${errorKind}]` : ''), + ); await writeWithBackpressure( formatSseFrame({ v: 1, diff --git a/packages/sdk-typescript/test/unit/daemonEvents.test.ts b/packages/sdk-typescript/test/unit/daemonEvents.test.ts index 3db1e17de4..839aab4f3f 100644 --- a/packages/sdk-typescript/test/unit/daemonEvents.test.ts +++ b/packages/sdk-typescript/test/unit/daemonEvents.test.ts @@ -2334,6 +2334,32 @@ describe('PR 21 — auth device-flow events', () => { expect(errored.streamError).toEqual({ error: 'transport gone' }); }); + it('captures errorKind on stream_error in view state (wenshao #4360 review)', () => { + // The daemon stamps `errorKind` on `stream_error` payloads when + // classifiable (commit `14637cd79`, via `mapDomainErrorToErrorKind`). + // SDK consumers receiving these frames need `state.streamError. + // errorKind` to render typed retry/remediation UI (e.g. retry on + // init_timeout, install on missing_binary) without regex-matching + // the `error` message string. + // + // This test pins the flowthrough: the reducer's `stream_error` + // case must assign `event.data` as-is to `state.streamError`, + // preserving all fields including the optional `errorKind`. If + // a future refactor strips `errorKind` (e.g. by spreading only + // `{error}` instead of the full data object), this fails. + const state = reduceDaemonSessionEvent(createDaemonSessionViewState(), { + v: 1, + type: 'stream_error', + data: { + error: 'initialize timed out after 5000ms', + errorKind: 'init_timeout', + }, + }); + expect(state.alive).toBe(false); + expect(state.streamError?.errorKind).toBe('init_timeout'); + expect(state.streamError?.error).toContain('timed out'); + }); + it('still applies session_closed while awaitingResync (wenshao #4360 review)', () => { // session_closed is in RESYNC_PASSTHROUGH_TYPES alongside // session_died — terminal session lifecycle signals must still From c37fe3aadd87797b31b4472b3abd7ea0d0d9b03f Mon Sep 17 00:00:00 2001 From: doudouOUC Date: Thu, 21 May 2026 10:28:05 +0800 Subject: [PATCH 6/6] test(serve): 2 wenshao observability fold-ins on #4360 (stderr log coverage) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adopts both threads from wenshao's third review round on PR #4360. Both Suggestion-level — pin the daemon-side stderr log artifacts that commit `dce2fed0f` introduced. Pre-fix: the EventBus-level state_resync_required emission was tested in eventBus.test.ts, and the SSE wire shape was tested in server.test.ts, but the actual operator-facing artifacts (the stderr log lines themselves) had no test coverage. A regression swapping operands in the `gap` arithmetic, dropping the sessionId from the log, or breaking the `[errorKind]` suffix would ship silently and only surface when an operator went grepping during an incident. **[Suggestion] SSE ring eviction stderr log untested** (PRRT_kwDOPB-92c6Dqtlb, server.ts:1948) Added 2 tests: - `writes a daemon-side stderr log on SSE ring eviction` — yields a `state_resync_required` frame from a fake bridge, spies on `process.stderr.write`, asserts the captured log contains `session sess-A` + `lastEventId=5` + `earliestInRing=12` + `gap=6 events` (pins the arithmetic) + `reason=ring_evicted` + `loadSession` (the recovery hint). - `falls back to "?" placeholders when state_resync_required data is partial` — yields a frame with empty `data: {}`, asserts every `?? '?'` branch fires (lastEventId=? / earliestInRing=? / gap=? events / reason=?). Defensive against future daemon schema changes that drop one of these fields. **[Suggestion] Bridge iterator error stderr log untested** (PRRT_kwDOPB-92c6Dqtlh, server.ts:1993) Added 2 tests: - `writes a daemon-side stderr log on bridge iterator error` — fake bridge throws plain `Error('agent died')` mid-stream, captures stderr, asserts the log contains `session sess-A` + `agent died`, and **no** `[…]` suffix (plain Error → `mapDomainErrorToErrorKind` returns undefined → no suffix). - `includes [errorKind] suffix in bridge iterator error log when classified` — fake bridge throws `BridgeTimeoutError('initialize', 5000)`, asserts the log contains `[init_timeout]`. Pins the classified-vs-unclassified branch of the conditional suffix template. All 4 tests use `vi.spyOn(process.stderr, 'write').mockReturnValue( true)` + filter `mock.calls` for the relevant log prefix — same pattern as the existing `mcp-client-manager.test.ts` stderr-spy tests in core, plus `startupProfiler.test.ts` in cli. Verification: 7/7 targeted observability tests pass. Pre-existing F3 (#4335 merge) test failures elsewhere are unrelated to this PR's changes. --- packages/cli/src/serve/server.test.ts | 172 ++++++++++++++++++++++++++ 1 file changed, 172 insertions(+) diff --git a/packages/cli/src/serve/server.test.ts b/packages/cli/src/serve/server.test.ts index bbac0c96a3..24b753b89e 100644 --- a/packages/cli/src/serve/server.test.ts +++ b/packages/cli/src/serve/server.test.ts @@ -4687,6 +4687,178 @@ describe('GET /session/:id/events (SSE)', () => { expect(parsed.data.error).toContain('timed out'); }); + it('writes a daemon-side stderr log on SSE ring eviction (#4360 wenshao observability fold-in)', async () => { + // The SSE write loop detects `state_resync_required` frames and + // emits a stderr breadcrumb so operators can grep daemon logs for + // ring-eviction events. Test covers: + // - the `writeStderrLine` actually fires + // - the `gap` arithmetic (earliestAvailableId - lastDeliveredId - 1) + // - all four data fields (lastEventId / earliestInRing / gap / reason) + // - the sessionId is included + const stderrSpy = vi.spyOn(process.stderr, 'write').mockReturnValue(true); + try { + const bridge = fakeBridge({ + async *subscribeImpl(_sessionId, _opts) { + yield { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }, + }; + await new Promise(() => {}); + }, + }); + handle = await runQwenServe( + { hostname: '127.0.0.1', port: 0, mode: 'http-bridge' }, + { bridge }, + ); + const port = (handle.server.address() as { port: number }).port; + const res = await fetch(`http://127.0.0.1:${port}/session/sess-A/events`); + await readSseFrames(res.body!, 1); + + const stderrLines = stderrSpy.mock.calls + .map((c) => String(c[0])) + .filter((s) => s.includes('SSE ring eviction detected')); + expect(stderrLines.length).toBeGreaterThanOrEqual(1); + const line = stderrLines[0]!; + expect(line).toContain('session sess-A'); + expect(line).toContain('lastEventId=5'); + expect(line).toContain('earliestInRing=12'); + // gap = 12 - 5 - 1 = 6 events + expect(line).toContain('gap=6 events'); + expect(line).toContain('reason=ring_evicted'); + expect(line).toContain('loadSession'); + } finally { + stderrSpy.mockRestore(); + } + }); + + it('falls back to "?" placeholders when state_resync_required data is partial', async () => { + // Defensive: the `?? '?'` fallback for missing fields lets the log + // line still print intelligibly when the daemon emits a partial + // payload (e.g. a future schema change drops one field). Pins the + // placeholder behavior so a regression that crashes the log call + // is caught. + const stderrSpy = vi.spyOn(process.stderr, 'write').mockReturnValue(true); + try { + const bridge = fakeBridge({ + async *subscribeImpl(_sessionId, _opts) { + yield { + v: 1, + type: 'state_resync_required', + // Intentionally missing all numeric fields + reason — + // exercises every `?? '?'` branch. + data: {} as unknown as { + reason: string; + lastDeliveredId: number; + earliestAvailableId: number; + }, + }; + await new Promise(() => {}); + }, + }); + handle = await runQwenServe( + { hostname: '127.0.0.1', port: 0, mode: 'http-bridge' }, + { bridge }, + ); + const port = (handle.server.address() as { port: number }).port; + await fetch(`http://127.0.0.1:${port}/session/sess-A/events`).then((r) => + readSseFrames(r.body!, 1), + ); + + const stderrLines = stderrSpy.mock.calls + .map((c) => String(c[0])) + .filter((s) => s.includes('SSE ring eviction detected')); + expect(stderrLines.length).toBeGreaterThanOrEqual(1); + const line = stderrLines[0]!; + // All four `?? '?'` branches print `?` for the missing values. + expect(line).toContain('lastEventId=?'); + expect(line).toContain('earliestInRing=?'); + expect(line).toContain('gap=? events'); + expect(line).toContain('reason=?'); + } finally { + stderrSpy.mockRestore(); + } + }); + + it('writes a daemon-side stderr log on bridge iterator error (#4360 wenshao observability fold-in)', async () => { + // The bridge-iterator-catch block in the SSE handler now emits a + // `writeStderrLine` BEFORE sending the `stream_error` SSE frame so + // operators can distinguish "subprocess OOM-killed" from "protocol + // bug" via `grep "bridge iterator error"`. Test covers: + // - the log fires with the error message + // - the sessionId is included + // - NO `[errorKind]` suffix for unclassified errors (plain Error) + const stderrSpy = vi.spyOn(process.stderr, 'write').mockReturnValue(true); + try { + const bridge = fakeBridge({ + async *subscribeImpl(_sessionId, _opts) { + yield { id: 1, v: 1, type: 'session_update', data: 'first' }; + throw new Error('agent died'); + }, + }); + handle = await runQwenServe( + { hostname: '127.0.0.1', port: 0, mode: 'http-bridge' }, + { bridge }, + ); + const port = (handle.server.address() as { port: number }).port; + await fetch(`http://127.0.0.1:${port}/session/sess-A/events`).then((r) => + readSseFrames(r.body!, 2), + ); + + const stderrLines = stderrSpy.mock.calls + .map((c) => String(c[0])) + .filter((s) => s.includes('bridge iterator error')); + expect(stderrLines.length).toBeGreaterThanOrEqual(1); + const line = stderrLines[0]!; + expect(line).toContain('session sess-A'); + expect(line).toContain('agent died'); + // Plain Error → mapDomainErrorToErrorKind returns undefined → + // suffix branch must NOT add `[...]`. + expect(line).not.toMatch(/\[.*?\]/); + } finally { + stderrSpy.mockRestore(); + } + }); + + it('includes [errorKind] suffix in bridge iterator error log when classified (#4360 wenshao observability fold-in)', async () => { + // BridgeTimeoutError → classified as `init_timeout`. The log line + // must include `[init_timeout]` so operators can `grep '\[init_'` + // for that specific failure class. + const { BridgeTimeoutError } = await import('@qwen-code/acp-bridge'); + const stderrSpy = vi.spyOn(process.stderr, 'write').mockReturnValue(true); + try { + const bridge = fakeBridge({ + async *subscribeImpl(_sessionId, _opts) { + yield { id: 1, v: 1, type: 'session_update', data: 'first' }; + throw new BridgeTimeoutError('initialize', 5000); + }, + }); + handle = await runQwenServe( + { hostname: '127.0.0.1', port: 0, mode: 'http-bridge' }, + { bridge }, + ); + const port = (handle.server.address() as { port: number }).port; + await fetch(`http://127.0.0.1:${port}/session/sess-A/events`).then((r) => + readSseFrames(r.body!, 2), + ); + + const stderrLines = stderrSpy.mock.calls + .map((c) => String(c[0])) + .filter((s) => s.includes('bridge iterator error')); + expect(stderrLines.length).toBeGreaterThanOrEqual(1); + const line = stderrLines[0]!; + expect(line).toContain('session sess-A'); + expect(line).toContain('timed out'); + expect(line).toContain('[init_timeout]'); + } finally { + stderrSpy.mockRestore(); + } + }); + it('forwards numeric Last-Event-ID even when supplied as a string', async () => { let seen: number | undefined; const bridge = fakeBridge({