diff --git a/packages/acp-bridge/src/bridgeClient.test.ts b/packages/acp-bridge/src/bridgeClient.test.ts index e139d95067..57978a9bc0 100644 --- a/packages/acp-bridge/src/bridgeClient.test.ts +++ b/packages/acp-bridge/src/bridgeClient.test.ts @@ -160,6 +160,192 @@ describe('BridgeClient — BridgeFileSystem injection seam (F1 step 5)', () => { }); }); + describe('FsError preservation over ACP wire (#4175 F4 prereq, Codex #4360 round 2)', () => { + // The fix scope: when `BridgeFileSystem.writeText` / + // `BridgeFileSystem.readText` throw a structured `FsError`, the + // BridgeClient must rethrow as ACP `RequestError` with `data. + // errorKind` / `data.hint` / `data.status` preserved. Pre-fix + // the ACP SDK serialized only `error.message` so SDK consumers + // lost the discriminator and had to regex-match the message. + // + // FsError lives in `cli/src/serve/fs/errors.ts` — acp-bridge can't + // import it (cross-package dep inversion), so we synthesize the + // shape directly here. The duck typing in + // `preserveFsErrorOverAcp` keys on `err.name === 'FsError'` + + // `typeof err.kind === 'string'`. + + function makeFsError( + kind: string, + message: string, + extras: { hint?: string; status?: number } = {}, + ): Error { + const err = new Error(message); + err.name = 'FsError'; + (err as unknown as { kind: string }).kind = kind; + if (extras.hint !== undefined) { + (err as unknown as { hint: string }).hint = extras.hint; + } + if (extras.status !== undefined) { + (err as unknown as { status: number }).status = extras.status; + } + return err; + } + + it('writeTextFile rethrows FsError as ACP RequestError with errorKind in data', async () => { + const writeText = vi.fn(async (): Promise => { + throw makeFsError( + 'untrusted_workspace', + 'workspace is not trusted; write operations are forbidden', + { + status: 403, + hint: 'enable trust via createWorkspaceFileSystemFactory', + }, + ); + }); + const client = makeClient({ writeText, readText: vi.fn() }); + + const err = (await client + .writeTextFile({ + path: '/x', + content: 'y', + sessionId: 'sess:test', + }) + .catch((e) => e)) as Error & { code?: number; data?: unknown }; + + // Reshaped as JSON-RPC RequestError (-32603 = internal error) + // with structured data field. + expect(err.name).toBe('RequestError'); + expect(err.code).toBe(-32603); + expect(err.message).toContain('not trusted'); + expect(err.data).toMatchObject({ + errorKind: 'untrusted_workspace', + status: 403, + hint: expect.any(String), + }); + }); + + it('readTextFile rethrows FsError preserving symlink_escape kind', async () => { + const readText = vi.fn(async (): Promise => { + throw makeFsError( + 'symlink_escape', + 'symlink resolves outside workspace', + { status: 400 }, + ); + }); + const client = makeClient({ writeText: vi.fn(), readText }); + + const err = (await client + .readTextFile({ path: '/x', sessionId: 'sess:test' }) + .catch((e) => e)) as Error & { code?: number; data?: unknown }; + + expect(err.name).toBe('RequestError'); + expect(err.code).toBe(-32603); + expect(err.data).toMatchObject({ + errorKind: 'symlink_escape', + status: 400, + }); + // No `hint` field on this FsError → not stamped (spread guard). + expect((err.data as { hint?: unknown }).hint).toBeUndefined(); + }); + + it('passes non-FsError errors through unchanged (no RequestError wrap)', async () => { + // Plain Error → bridgeClient must NOT wrap it. Only structured + // FsError gets the reshape. ACP's default serialization is + // adequate for unstructured errors. + const writeText = vi.fn(async (): Promise => { + throw new Error('boring generic failure'); + }); + const client = makeClient({ writeText, readText: vi.fn() }); + + const err = (await client + .writeTextFile({ + path: '/x', + content: 'y', + sessionId: 'sess:test', + }) + .catch((e) => e)) as Error & { code?: number; data?: unknown }; + + // Original Error preserved — no JSON-RPC code stamped. + expect(err.name).toBe('Error'); + expect(err.message).toBe('boring generic failure'); + expect(err.code).toBeUndefined(); + expect(err.data).toBeUndefined(); + }); + + it('readTextFile passes non-FsError errors through unchanged (wenshao #4360 review)', async () => { + // Symmetric guard for the read-side `preserveFsErrorOverAcp` + // call. The write- and read-side catch blocks are independent + // try/catch wrappers in `bridgeClient.ts`; if a future refactor + // diverges them (e.g. adds Error-wrapping to one but not the + // other), this test catches the read-side regression. + const readText = vi.fn(async (): Promise => { + throw new Error('generic read failure'); + }); + const client = makeClient({ writeText: vi.fn(), readText }); + + const err = (await client + .readTextFile({ path: '/x', sessionId: 'sess:test' }) + .catch((e) => e)) as Error & { code?: number; data?: unknown }; + + expect(err.name).toBe('Error'); + expect(err.message).toBe('generic read failure'); + expect(err.code).toBeUndefined(); + expect(err.data).toBeUndefined(); + }); + + it('preserves hint field when present on the FsError', async () => { + const writeText = vi.fn(async (): Promise => { + throw makeFsError( + 'file_too_large', + 'file of 6 MiB exceeds write cap of 5 MiB', + { hint: 'split large writes into bounded chunks', status: 413 }, + ); + }); + const client = makeClient({ writeText, readText: vi.fn() }); + + const err = (await client + .writeTextFile({ + path: '/x', + content: 'y', + sessionId: 'sess:test', + }) + .catch((e) => e)) as Error & { code?: number; data?: unknown }; + + expect((err.data as { hint?: string }).hint).toBe( + 'split large writes into bounded chunks', + ); + expect((err.data as { errorKind?: string }).errorKind).toBe( + 'file_too_large', + ); + }); + + it('does not wrap an error that LOOKS like FsError but has wrong name', async () => { + // Defensive: an unrelated error class with a `kind` field but + // a different `name` should fall through to the unstructured + // path. Prevents accidental wrapping of e.g. permission errors + // that happen to carry a `kind` discriminator. + const writeText = vi.fn(async (): Promise => { + const err = new Error('looks-similar'); + err.name = 'PermissionForbiddenError'; + (err as unknown as { kind: string }).kind = + 'designated_originator_mismatch'; + throw err; + }); + const client = makeClient({ writeText, readText: vi.fn() }); + + const err = (await client + .writeTextFile({ + path: '/x', + content: 'y', + sessionId: 'sess:test', + }) + .catch((e) => e)) as Error & { code?: number }; + + expect(err.name).toBe('PermissionForbiddenError'); + expect(err.code).toBeUndefined(); + }); + }); + describe('inline fallback when fileSystem is omitted (regression guard)', () => { let tmpDir: string; beforeEach(async () => { diff --git a/packages/acp-bridge/src/bridgeClient.ts b/packages/acp-bridge/src/bridgeClient.ts index 4b8b911afb..c0792fe211 100644 --- a/packages/acp-bridge/src/bridgeClient.ts +++ b/packages/acp-bridge/src/bridgeClient.ts @@ -17,6 +17,7 @@ import type { WriteTextFileRequest, WriteTextFileResponse, } from '@agentclientprotocol/sdk'; +import { RequestError } from '@agentclientprotocol/sdk'; import type { BridgeEvent, EventBus } from './eventBus.js'; import type { BridgeFileSystem } from './bridgeFileSystem.js'; import { CANCEL_VOTE_SENTINEL } from './permissionMediator.js'; @@ -35,6 +36,67 @@ import type { import { CancelSentinelCollisionError } from './bridgeErrors.js'; import { writeStderrLine } from './internal/stderrLine.js'; +/** + * Duck-type check for `FsError` from `cli/src/serve/fs/errors.ts` + * (#4175 F4 prereq — Codex review on #4360 round 2). FsError lives + * in the `cli` package, but this class lives in `acp-bridge` — a + * direct `import { FsError }` would invert the dependency. We use + * the same `.name`-based duck typing that `mapDomainErrorToErrorKind` + * (status.ts) already applies to `TrustGateError` / `SkillError` + * for the same cross-package bundling reason. + * + * Without this preservation: when the `BridgeFileSystem` adapter + * throws an `FsError` (e.g. `kind: 'untrusted_workspace'`, `kind: + * 'symlink_escape'`, `kind: 'file_too_large'`), the ACP SDK's + * default RPC error path serializes only `error.message` as + * "Internal error" — the structured `kind` / `status` / `hint` are + * lost on the wire. SDK consumers downstream can no longer dispatch + * typed UI (auth retry vs file picker vs proxy hint) without + * regex-matching the human-readable message. + * + * With this preservation: the bridge boundary catches FsError, + * rethrows as ACP `RequestError(-32603, message, {errorKind, hint, + * status})`. The agent's RPC client receives `data.errorKind` and + * can branch on the closed-enum kind. JSON-RPC code stays at + * internal-error (-32603) since the bridge can't reliably map + * FsError.kind to a JSON-RPC error code shape — the structured + * `data` field is what carries semantic information for SDK + * consumers. + */ +interface FsErrorShape { + name: 'FsError'; + message: string; + kind: string; + status?: number; + hint?: string; +} + +function isFsErrorShape(err: unknown): err is FsErrorShape { + return ( + err instanceof Error && + err.name === 'FsError' && + typeof (err as { kind?: unknown }).kind === 'string' + ); +} + +/** + * Rethrow an FsError as a structured ACP `RequestError` so the + * agent's RPC client sees `data.errorKind` / `data.hint` / + * `data.status` rather than just the human-readable message. + * Non-FsError errors are rethrown unchanged — the default ACP + * serialization is fine for unstructured errors. + */ +function preserveFsErrorOverAcp(err: unknown): never { + if (isFsErrorShape(err)) { + throw new RequestError(-32603, err.message, { + errorKind: err.kind, + ...(err.hint !== undefined ? { hint: err.hint } : {}), + ...(err.status !== undefined ? { status: err.status } : {}), + }); + } + throw err; +} + /** * #4175 F3 Commit 3 — translate the mediator's internal * `PermissionResolution` to the ACP-shaped `RequestPermissionResponse` @@ -647,7 +709,17 @@ export class BridgeClient implements Client { // injection and fall through to the inline path so pre-F1 behavior // is preserved verbatim where no adapter has been wired. if (this.fileSystem) { - return this.fileSystem.writeText(params); + // #4175 F4 prereq — preserve FsError structure over ACP wire + // (Codex review on #4360 round 2). Without this catch, an + // `FsError({kind:'untrusted_workspace'})` from the adapter + // would land at the agent as `{code:-32603, message:...}` with + // the kind/status/hint stripped. See `preserveFsErrorOverAcp` + // for the cross-package duck-typing rationale. + try { + return await this.fileSystem.writeText(params); + } catch (err) { + preserveFsErrorOverAcp(err); + } } // Stage 1 known divergence: this raw `fs.writeFile` reimplements file // I/O instead of delegating to core's filesystem service. The @@ -795,7 +867,13 @@ export class BridgeClient implements Client { // Mode A + channels + IDE companion fall through to the inline // proxy below. if (this.fileSystem) { - return this.fileSystem.readText(params); + // #4175 F4 prereq — preserve FsError structure over ACP wire. + // See sibling block in `writeTextFile` for rationale. + try { + return await this.fileSystem.readText(params); + } catch (err) { + preserveFsErrorOverAcp(err); + } } // Reject obviously-degenerate `limit` up front. Without this, // `sliceLineRange` hits the `end < start` path and returns an diff --git a/packages/acp-bridge/src/eventBus.test.ts b/packages/acp-bridge/src/eventBus.test.ts index 029526b8f8..271219fc20 100644 --- a/packages/acp-bridge/src/eventBus.test.ts +++ b/packages/acp-bridge/src/eventBus.test.ts @@ -274,13 +274,21 @@ describe('EventBus', () => { // A `lastEventId: 0` resume with a queue cap larger than the ring // collects exactly 8000 live frames; ids start at 2 because id=1 // was the one shifted out of the ring. + // + // #4175 F4 prereq: `lastEventId: 0` + earliest-id-in-ring = 2 + // crosses the eviction-detection threshold (earliest > last + 1), + // so an extra synthetic `state_resync_required` frame is emitted + // FIRST. The filter below restricts to live ids, which excludes + // the synthetic (no id), so the original "8000 live frames" + // invariant is preserved. const abort = new AbortController(); const iter = bus.subscribe({ lastEventId: 0, maxQueued: 9000, signal: abort.signal, }); - const events = await collect(iter, 8000); + // Collect 8001 frames now: 1 synthetic resync + 8000 live. + const events = await collect(iter, 8001); abort.abort(); const liveIds = events .filter((e) => e.id !== undefined) @@ -288,6 +296,8 @@ describe('EventBus', () => { expect(liveIds).toHaveLength(8000); expect(liveIds[0]).toBe(2); expect(liveIds[liveIds.length - 1]).toBe(8001); + // The synthetic resync frame is the first one. + expect(events[0]?.type).toBe('state_resync_required'); }); it('eviction detaches the abort listener from a stalled consumer (BmJT1)', async () => { @@ -480,9 +490,138 @@ describe('EventBus', () => { const out: BridgeEvent[] = []; for await (const e of iter) { out.push(e); - if (out.length === 3) break; + // After the state_resync_required frame (synthetic) + 3 replay + // frames, we're done. + if (out.length === 4) break; } - expect(out.map((e) => e.id)).toEqual([3, 4, 5]); + // First frame is the synthetic state_resync_required (no id). + expect(out[0]?.type).toBe('state_resync_required'); + expect(out[0]?.id).toBeUndefined(); + // Then the 3 surviving ring frames. + expect(out.slice(1).map((e) => e.id)).toEqual([3, 4, 5]); abort.abort(); }); + + describe('state_resync_required (#4175 F4 prereq, Ilya0527 issue #15)', () => { + it('emits state_resync_required when lastEventId is past the ring head', async () => { + // Setup: ring holds 3, ids 1..5 published → ring contains [3,4,5]. + // Consumer reconnects with Last-Event-ID: 1 → events 2 was evicted. + // Daemon must emit state_resync_required FIRST so SDK reducer + // knows its state is stale before applying any replay frames. + const bus = new EventBus(3); + for (let i = 1; i <= 5; i++) bus.publish({ type: 'foo', data: i }); + const abort = new AbortController(); + const iter = bus.subscribe({ + lastEventId: 1, + signal: abort.signal, + }); + const out: BridgeEvent[] = []; + for await (const e of iter) { + out.push(e); + if (out.length === 4) break; + } + // First frame is the resync terminal (synthetic, no id). + expect(out[0]?.type).toBe('state_resync_required'); + expect(out[0]?.id).toBeUndefined(); + const data = out[0]?.data as { + reason: string; + lastDeliveredId: number; + earliestAvailableId: number; + }; + expect(data.reason).toBe('ring_evicted'); + expect(data.lastDeliveredId).toBe(1); + expect(data.earliestAvailableId).toBe(3); // event 2 was evicted + // Replay continues after the resync frame (per design — SDK can + // compute "what you missed" diff later) — so we still get the + // 3 surviving ring frames. + expect(out.slice(1).map((e) => e.id)).toEqual([3, 4, 5]); + abort.abort(); + }); + + it('does NOT emit state_resync_required when lastEventId is in the ring', async () => { + // Consumer's lastEventId is well within the ring → no gap → no + // resync needed. + const bus = new EventBus(10); + for (let i = 1; i <= 5; i++) bus.publish({ type: 'foo', data: i }); + const abort = new AbortController(); + const iter = bus.subscribe({ + lastEventId: 2, + signal: abort.signal, + }); + const out: BridgeEvent[] = []; + for await (const e of iter) { + out.push(e); + if (out.length === 3) break; + } + // No resync frame — just the 3 replay frames (ids 3, 4, 5). + expect(out.map((e) => e.id)).toEqual([3, 4, 5]); + expect(out.some((e) => e.type === 'state_resync_required')).toBe(false); + abort.abort(); + }); + + it('does NOT emit state_resync_required at the exact boundary (lastEventId === earliest - 1)', async () => { + // Boundary: ring's earliest id is N, lastEventId is N-1. + // No gap → no resync. Off-by-one guard. + const bus = new EventBus(3); + for (let i = 1; i <= 5; i++) bus.publish({ type: 'foo', data: i }); + // Ring is now [3, 4, 5]. lastEventId=2 means "I have 1 and 2"; + // next expected is 3, which IS in the ring. No gap. + const abort = new AbortController(); + const iter = bus.subscribe({ + lastEventId: 2, + signal: abort.signal, + }); + const out: BridgeEvent[] = []; + for await (const e of iter) { + out.push(e); + if (out.length === 3) break; + } + expect(out.some((e) => e.type === 'state_resync_required')).toBe(false); + expect(out.map((e) => e.id)).toEqual([3, 4, 5]); + abort.abort(); + }); + + it('does NOT emit state_resync_required when ring is empty', async () => { + // No publishes yet → earliestInRing is undefined → resync check + // skipped. Subscriber just waits for live events. + const bus = new EventBus(10); + const abort = new AbortController(); + const iter = bus.subscribe({ + lastEventId: 5, + signal: abort.signal, + }); + // Publish one live event AFTER subscribe to confirm the stream works. + setTimeout(() => bus.publish({ type: 'foo', data: 1 }), 0); + const out: BridgeEvent[] = []; + for await (const e of iter) { + out.push(e); + if (out.length === 1) break; + } + // No resync frame — just the one live event. + expect(out[0]?.type).toBe('foo'); + expect(out[0]?.id).toBe(1); + abort.abort(); + }); + + it('does NOT emit state_resync_required when no lastEventId is provided (fresh subscribe)', async () => { + // First-time subscriber has no prior state to resync — resync + // would be meaningless. Check the no-lastEventId branch is + // skipped entirely. + const bus = new EventBus(3); + for (let i = 1; i <= 5; i++) bus.publish({ type: 'foo', data: i }); + const abort = new AbortController(); + const iter = bus.subscribe({ signal: abort.signal }); + // Live-only — publish one event after subscribe to give the + // iterator something to yield. + setTimeout(() => bus.publish({ type: 'foo', data: 99 }), 0); + const out: BridgeEvent[] = []; + for await (const e of iter) { + out.push(e); + if (out.length === 1) break; + } + expect(out[0]?.type).toBe('foo'); + expect(out.some((e) => e.type === 'state_resync_required')).toBe(false); + abort.abort(); + }); + }); }); diff --git a/packages/acp-bridge/src/eventBus.ts b/packages/acp-bridge/src/eventBus.ts index 861e02fbc1..b55df438df 100644 --- a/packages/acp-bridge/src/eventBus.ts +++ b/packages/acp-bridge/src/eventBus.ts @@ -357,6 +357,50 @@ export class EventBus { this.subs.add(sub); if (opts.lastEventId !== undefined) { + // Detect ring eviction on resume (#4175 F4 prereq, Ilya0527 + // issue #15): if the earliest event still in the ring has + // `id > lastEventId + 1`, then events between `lastEventId + 1` + // and `earliestInRing - 1` were evicted before the consumer + // reconnected — the consumer's reducer has a gap it doesn't + // know about. Pre-fix the resume silently succeeded ("you + // caught up!") even though the SDK reducer's state was now + // diverged from the daemon's truth. + // + // Emit `state_resync_required` as an id-less synthetic frame + // (no `id` — same no-burn pattern as `client_evicted`, so it + // doesn't occupy a slot in the per-session monotonic sequence + // other subscribers observe). **Unlike `client_evicted`, the + // stream stays OPEN after this frame** — the resync frame is + // emitted FIRST (before replay), and replay + live frames + // continue flowing afterward. The SDK reducer treats this as + // "your state is stale; call loadSession before applying any + // further deltas" — see `awaitingResync` flag in the SDK + // reducer. Wenshao review (#4360) corrected the prior wording + // that called this "TERMINAL" — that's misleading for oncall; + // `client_evicted` is genuinely terminal (closes stream), + // `state_resync_required` is recovery-oriented (keeps stream + // open). + // + // Replay continues after the resync frame (per design): the + // SDK reducer will auto-skip delta application until + // loadSession clears the flag, but the frames stay on the + // wire so SDK has the option to compute a "what you missed" + // diff later. This is network-friendly (no extra reconnect). + const earliestInRing = this.ring[0]?.id; + if ( + earliestInRing !== undefined && + earliestInRing > opts.lastEventId + 1 + ) { + queue.forcePush({ + v: EVENT_SCHEMA_VERSION, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: opts.lastEventId, + earliestAvailableId: earliestInRing, + }, + }); + } // Force-push replay frames so they bypass the per-subscriber size // cap. The cap protects against a slow live consumer; replay is // already historical and silently dropping it would undermine the diff --git a/packages/cli/src/acp-integration/session/HistoryReplayer.test.ts b/packages/cli/src/acp-integration/session/HistoryReplayer.test.ts index 0188fdaa41..cf15142437 100644 --- a/packages/cli/src/acp-integration/session/HistoryReplayer.test.ts +++ b/packages/cli/src/acp-integration/session/HistoryReplayer.test.ts @@ -259,6 +259,11 @@ describe('HistoryReplayer', () => { rawInput: { path: '/test.ts' }, _meta: { toolName: 'read_file', + // #4175 F4 prereq — ToolCallEmitter now stamps provenance + // on every tool_call / tool_call_update event so the UI can + // dispatch on builtin / mcp / subagent without string- + // matching toolName. + provenance: 'builtin', timestamp: toEpochMs(record.timestamp), }, }), @@ -314,6 +319,8 @@ describe('HistoryReplayer', () => { rawOutput: 'File contents here', _meta: { toolName: 'read_file', + // #4175 F4 prereq — provenance stamped on update events too. + provenance: 'builtin', timestamp: toEpochMs(record.timestamp), }, }); diff --git a/packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.test.ts b/packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.test.ts index 6acc302221..4b2e8a3698 100644 --- a/packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.test.ts +++ b/packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.test.ts @@ -6,7 +6,7 @@ import { describe, it, expect, vi, beforeEach } from 'vitest'; import { ToolCallEmitter } from './ToolCallEmitter.js'; -import type { SessionContext } from '../types.js'; +import type { SessionContext, SubagentMeta } from '../types.js'; import type { Config, ToolRegistry, @@ -77,7 +77,7 @@ describe('ToolCallEmitter', () => { locations: [], kind: 'other', rawInput: { arg1: 'value1' }, - _meta: { toolName: 'unknown_tool' }, + _meta: { toolName: 'unknown_tool', provenance: 'builtin' }, }); }); @@ -101,7 +101,7 @@ describe('ToolCallEmitter', () => { locations: [{ path: '/test/file.ts', line: 10 }], kind: 'edit', rawInput: { path: '/test.ts' }, - _meta: { toolName: 'edit_file' }, + _meta: { toolName: 'edit_file', provenance: 'builtin' }, }); }); @@ -125,7 +125,7 @@ describe('ToolCallEmitter', () => { expect(sendUpdateSpy).toHaveBeenCalledWith( expect.objectContaining({ rawInput: {}, - _meta: { toolName: 'test_tool' }, + _meta: { toolName: 'test_tool', provenance: 'builtin' }, }), ); }); @@ -153,7 +153,7 @@ describe('ToolCallEmitter', () => { locations: [], // Fallback to empty kind: 'other', // Fallback to other rawInput: { invalid: true }, - _meta: { toolName: 'failing_tool' }, + _meta: { toolName: 'failing_tool', provenance: 'builtin' }, }); }); }); @@ -174,7 +174,7 @@ describe('ToolCallEmitter', () => { toolCallId: 'call-123', status: 'completed', rawOutput: 'Tool completed successfully', - _meta: { toolName: 'test_tool' }, + _meta: { toolName: 'test_tool', provenance: 'builtin' }, }), ); }); @@ -198,7 +198,7 @@ describe('ToolCallEmitter', () => { content: { type: 'text', text: 'Something went wrong' }, }, ], - _meta: { toolName: 'test_tool' }, + _meta: { toolName: 'test_tool', provenance: 'builtin' }, }); }); @@ -228,7 +228,7 @@ describe('ToolCallEmitter', () => { newText: 'new content', }, ], - _meta: { toolName: 'edit_file' }, + _meta: { toolName: 'edit_file', provenance: 'builtin' }, }), ); }); @@ -263,7 +263,7 @@ describe('ToolCallEmitter', () => { }, }, ], - _meta: { toolName: 'edit_file' }, + _meta: { toolName: 'edit_file', provenance: 'builtin' }, }), ); }); @@ -289,7 +289,7 @@ describe('ToolCallEmitter', () => { }, ], rawOutput: 'raw output', - _meta: { toolName: 'test_tool' }, + _meta: { toolName: 'test_tool', provenance: 'builtin' }, }), ); }); @@ -307,7 +307,7 @@ describe('ToolCallEmitter', () => { toolCallId: 'call-empty', status: 'completed', content: [], - _meta: { toolName: 'test_tool' }, + _meta: { toolName: 'test_tool', provenance: 'builtin' }, }); }); @@ -399,7 +399,7 @@ describe('ToolCallEmitter', () => { content: { type: 'text', text: 'Connection timeout' }, }, ], - _meta: { toolName: 'test_tool' }, + _meta: { toolName: 'test_tool', provenance: 'builtin' }, }); }); }); @@ -543,7 +543,7 @@ describe('ToolCallEmitter', () => { }, ], rawOutput: { unknownField: 'value', nested: { data: 123 } }, - _meta: { toolName: 'test_tool' }, + _meta: { toolName: 'test_tool', provenance: 'builtin' }, }), ); }); @@ -565,7 +565,7 @@ describe('ToolCallEmitter', () => { toolCallId: 'call-extra', status: 'completed', rawOutput: 'Result text', - _meta: { toolName: 'test_tool' }, + _meta: { toolName: 'test_tool', provenance: 'builtin' }, }), ); }); @@ -580,7 +580,10 @@ describe('ToolCallEmitter', () => { const call = sendUpdateSpy.mock.calls[0][0]; expect(call.rawOutput).toBeUndefined(); - expect(call._meta).toEqual({ toolName: 'test_tool' }); + expect(call._meta).toEqual({ + toolName: 'test_tool', + provenance: 'builtin', + }); }); }); @@ -671,7 +674,7 @@ describe('ToolCallEmitter', () => { content: { type: 'text', text: 'Text content from message' }, }, ], - _meta: { toolName: 'test_tool' }, + _meta: { toolName: 'test_tool', provenance: 'builtin' }, }); }); @@ -703,10 +706,121 @@ describe('ToolCallEmitter', () => { }, ], rawOutput: 'raw result', - _meta: { toolName: 'test_tool' }, + _meta: { toolName: 'test_tool', provenance: 'builtin' }, }), ); }); }); }); + + describe('resolveToolProvenance (#4175 F4 prereq, chiga0 #19 P0)', () => { + // Pure static utility — exercise without an emitter instance. + it('classifies a plain tool name as builtin (no serverId)', () => { + const out = ToolCallEmitter.resolveToolProvenance('shell'); + expect(out).toEqual({ provenance: 'builtin' }); + }); + + it('classifies a tool name without mcp__ prefix as builtin', () => { + const out = ToolCallEmitter.resolveToolProvenance('read_file'); + expect(out).toEqual({ provenance: 'builtin' }); + }); + + it('classifies mcp____ as mcp with serverId', () => { + const out = ToolCallEmitter.resolveToolProvenance( + 'mcp__filesystem__read', + ); + expect(out).toEqual({ provenance: 'mcp', serverId: 'filesystem' }); + }); + + it('preserves underscores in the tool segment', () => { + // Server segment is `playwright`; tool segment is `take_screenshot` + // (with underscore inside the tool name — `split("__")` handles + // this because we split on the double-underscore delimiter). + const out = ToolCallEmitter.resolveToolProvenance( + 'mcp__playwright__take_screenshot', + ); + expect(out).toEqual({ provenance: 'mcp', serverId: 'playwright' }); + }); + + it('classifies malformed mcp__ prefix (only one segment) as builtin', () => { + // No double-underscore delimiter past the prefix → not a valid + // mcp tool name; fall back to builtin rather than stamping + // garbage serverId. + const out = ToolCallEmitter.resolveToolProvenance('mcp__just_one'); + expect(out).toEqual({ provenance: 'builtin' }); + }); + + it('classifies mcp____ as builtin (empty server segment)', () => { + const out = ToolCallEmitter.resolveToolProvenance('mcp____read'); + expect(out).toEqual({ provenance: 'builtin' }); + }); + + it('classifies any tool as subagent when subagentMeta is present', () => { + // subagent takes precedence over mcp__ naming — a sub-agent + // calling an MCP tool is rendered as "subagent block" not + // "MCP block" in the UI. + const out = ToolCallEmitter.resolveToolProvenance('mcp__fs__read', { + agentType: 'researcher', + } as unknown as SubagentMeta); + expect(out).toEqual({ provenance: 'subagent' }); + }); + + it('classifies a plain builtin tool with subagentMeta as subagent', () => { + const out = ToolCallEmitter.resolveToolProvenance('shell', { + agentType: 'coder', + } as unknown as SubagentMeta); + expect(out).toEqual({ provenance: 'subagent' }); + }); + }); + + describe('provenance stamping on emit (#4175 F4 prereq)', () => { + it('stamps provenance:mcp + serverId on emitStart for mcp__ tools', async () => { + await emitter.emitStart({ + toolName: 'mcp__github__create_issue', + callId: 'call-mcp', + args: { title: 'bug' }, + }); + expect(sendUpdateSpy).toHaveBeenCalledWith( + expect.objectContaining({ + sessionUpdate: 'tool_call', + _meta: expect.objectContaining({ + toolName: 'mcp__github__create_issue', + provenance: 'mcp', + serverId: 'github', + }), + }), + ); + }); + + it('stamps provenance:subagent (no serverId) when subagentMeta present', async () => { + await emitter.emitStart({ + toolName: 'shell', + callId: 'call-sub', + args: {}, + subagentMeta: { agentType: 'researcher' } as unknown as SubagentMeta, + }); + const call = sendUpdateSpy.mock.calls[0][0]; + expect(call._meta.provenance).toBe('subagent'); + expect(call._meta.serverId).toBeUndefined(); + }); + + it('stamps provenance on emitResult so reconnecting clients can re-derive it', async () => { + await emitter.emitResult({ + toolName: 'mcp__db__query', + callId: 'call-r', + success: true, + message: [], + }); + const call = sendUpdateSpy.mock.calls[0][0]; + expect(call._meta.provenance).toBe('mcp'); + expect(call._meta.serverId).toBe('db'); + }); + + it('stamps provenance on emitError as well', async () => { + await emitter.emitError('call-e', 'mcp__fs__write', new Error('boom')); + const call = sendUpdateSpy.mock.calls[0][0]; + expect(call._meta.provenance).toBe('mcp'); + expect(call._meta.serverId).toBe('fs'); + }); + }); }); diff --git a/packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.ts b/packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.ts index 92f66ee474..f19786da73 100644 --- a/packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.ts +++ b/packages/cli/src/acp-integration/session/emitters/ToolCallEmitter.ts @@ -57,6 +57,10 @@ export class ToolCallEmitter extends BaseEmitter { params.toolName, params.args, ); + const provenance = ToolCallEmitter.resolveToolProvenance( + params.toolName, + params.subagentMeta, + ); await this.sendUpdate({ sessionUpdate: 'tool_call', @@ -70,6 +74,8 @@ export class ToolCallEmitter extends BaseEmitter { _meta: { toolName: params.toolName, ...params.subagentMeta, + provenance: provenance.provenance, + ...(provenance.serverId ? { serverId: provenance.serverId } : {}), ...(BaseEmitter.toEpochMs(params.timestamp) != null && { timestamp: BaseEmitter.toEpochMs(params.timestamp), }), @@ -124,6 +130,10 @@ export class ToolCallEmitter extends BaseEmitter { } // Build the update + const provenance = ToolCallEmitter.resolveToolProvenance( + params.toolName, + params.subagentMeta, + ); const update: Parameters[0] = { sessionUpdate: 'tool_call_update', toolCallId: params.callId, @@ -132,6 +142,8 @@ export class ToolCallEmitter extends BaseEmitter { _meta: { toolName: params.toolName, ...params.subagentMeta, + provenance: provenance.provenance, + ...(provenance.serverId ? { serverId: provenance.serverId } : {}), ...(BaseEmitter.toEpochMs(params.timestamp) != null && { timestamp: BaseEmitter.toEpochMs(params.timestamp), }), @@ -161,6 +173,10 @@ export class ToolCallEmitter extends BaseEmitter { error: Error, subagentMeta?: SubagentMeta, ): Promise { + const provenance = ToolCallEmitter.resolveToolProvenance( + toolName, + subagentMeta, + ); await this.sendUpdate({ sessionUpdate: 'tool_call_update', toolCallId: callId, @@ -171,10 +187,55 @@ export class ToolCallEmitter extends BaseEmitter { _meta: { toolName, ...subagentMeta, + provenance: provenance.provenance, + ...(provenance.serverId ? { serverId: provenance.serverId } : {}), }, }); } + /** + * Resolve a tool's provenance for UI dispatch on tool_call events + * (#4175 F4 prereq, chiga0 issue #19 P0). The SDK reads `_meta. + * provenance` + `_meta.serverId` to render builtin / MCP-server-badge / + * subagent-block differently. Without this stamping, the SDK falls + * back to string-matching the toolName which can't reliably + * distinguish builtin from subagent. + * + * Resolution rules: + * - `subagentMeta` present → `'subagent'` (a Task tool / Codex + * subagent / etc. wrapping its own tool calls) + * - toolName matches `mcp____` → `'mcp'` with + * `serverId: `. Naming convention from + * `packages/core/src/tools/mcp-tool.ts` in the + * `@qwen-code/qwen-code-core` package — mirrors the SDK's same + * heuristic fallback so SDK consumers stay consistent with + * daemon classification. + * - everything else → `'builtin'` + * + * Static + pure so it can be unit-tested without an emitter + * instance. Exported via `ToolCallEmitter.resolveToolProvenance`. + */ + static resolveToolProvenance( + toolName: string, + subagentMeta?: SubagentMeta, + ): { provenance: 'builtin' | 'mcp' | 'subagent'; serverId?: string } { + if (subagentMeta !== undefined) { + return { provenance: 'subagent' }; + } + if (toolName.startsWith('mcp__')) { + // mcp____ — split is "__", not single "_", + // so server / tool segments can contain underscores. Require + // both a non-empty server segment and at least one segment past + // it; malformed names fall through to 'builtin' rather than + // stamping an empty/garbage serverId. + const parts = toolName.split('__'); + if (parts.length >= 3 && parts[1] && parts[1].length > 0) { + return { provenance: 'mcp', serverId: parts[1] }; + } + } + return { provenance: 'builtin' }; + } + // ==================== Public Utilities ==================== /** diff --git a/packages/cli/src/serve/server.test.ts b/packages/cli/src/serve/server.test.ts index 4567acebed..24b753b89e 100644 --- a/packages/cli/src/serve/server.test.ts +++ b/packages/cli/src/serve/server.test.ts @@ -4358,7 +4358,10 @@ describe('GET /session/:id/events (SSE)', () => { expect(frames).toHaveLength(2); expect(frames[0]?.id).toBe('1'); expect(frames[0]?.event).toBe('session_update'); - expect(JSON.parse(frames[0]!.data!)).toEqual({ + // `toMatchObject` rather than `toEqual` because the SSE write + // boundary stamps `_meta.serverTimestamp` (#4175 F4 prereq); + // a dedicated test below pins that field's shape. + expect(JSON.parse(frames[0]!.data!)).toMatchObject({ id: 1, v: 1, type: 'session_update', @@ -4367,6 +4370,80 @@ describe('GET /session/:id/events (SSE)', () => { expect(frames[1]?.id).toBe('2'); }); + it('stamps _meta.serverTimestamp on every SSE frame (#4175 F4 prereq, chiga0 #19 P0)', async () => { + // The daemon stamps `_meta.serverTimestamp` at the SSE write + // boundary so multi-client UIs use the server clock for transcript + // ordering / "X minutes ago" instead of each client's drifting + // local clock. The chiga0 SDK PR #4353 reads this via a 3- + // location probe (`event.serverTimestamp` / `event._meta. + // serverTimestamp` / `event.data._meta.serverTimestamp`); we + // pick `_meta.serverTimestamp` (Anthropic convention) so the + // top-level event type stays unpolluted. + const before = Date.now(); + const bridge = fakeBridge({ + async *subscribeImpl(_sessionId, _opts) { + yield { + id: 1, + v: 1, + type: 'session_update', + data: { foo: 'bar' }, + }; + await new Promise(() => {}); + }, + }); + handle = await runQwenServe( + { hostname: '127.0.0.1', port: 0, mode: 'http-bridge' }, + { bridge }, + ); + const port = (handle.server.address() as { port: number }).port; + + const res = await fetch(`http://127.0.0.1:${port}/session/sess-A/events`); + const frames = await readSseFrames(res.body!, 1); + const after = Date.now(); + + const parsed = JSON.parse(frames[0]!.data!); + expect(parsed._meta).toBeDefined(); + expect(typeof parsed._meta.serverTimestamp).toBe('number'); + // Server clock at stamp time must fall within the test's + // before/after wall-clock window. + expect(parsed._meta.serverTimestamp).toBeGreaterThanOrEqual(before); + expect(parsed._meta.serverTimestamp).toBeLessThanOrEqual(after); + }); + + it('preserves pre-existing _meta keys when stamping serverTimestamp', async () => { + // ToolCallEmitter (and other emitters) attach `_meta.toolName` etc. + // The SSE boundary stamp must MERGE (not overwrite) so downstream + // consumers keep both fields. `BridgeEvent` doesn't type `_meta` + // explicitly (it's a wire-only escape hatch) so we cast the yield. + const bridge = fakeBridge({ + async *subscribeImpl(_sessionId, _opts) { + yield { + id: 1, + v: 1, + type: 'session_update', + data: { sessionUpdate: 'tool_call', toolCallId: 't1' }, + // Pre-existing _meta on the event (mimics ToolCallEmitter). + _meta: { toolName: 'Read', timestamp: 1234567890 }, + } as unknown as { id: 1; v: 1; type: 'session_update'; data: unknown }; + await new Promise(() => {}); + }, + }); + handle = await runQwenServe( + { hostname: '127.0.0.1', port: 0, mode: 'http-bridge' }, + { bridge }, + ); + const port = (handle.server.address() as { port: number }).port; + + const res = await fetch(`http://127.0.0.1:${port}/session/sess-A/events`); + const frames = await readSseFrames(res.body!, 1); + + const parsed = JSON.parse(frames[0]!.data!); + // Both the pre-existing _meta keys AND serverTimestamp must survive. + expect(parsed._meta.toolName).toBe('Read'); + expect(parsed._meta.timestamp).toBe(1234567890); + expect(typeof parsed._meta.serverTimestamp).toBe('number'); + }); + it('forwards Last-Event-ID to the bridge', async () => { const seen: number[] = []; const bridge = fakeBridge({ @@ -4574,9 +4651,214 @@ describe('GET /session/:id/events (SSE)', () => { // it doesn't pollute the per-session monotonic sequence used for // Last-Event-ID resume. expect(frames[1]?.id).toBeUndefined(); + // `Error('agent died')` isn't classified by `mapDomainErrorToErrorKind` + // (no Bridge*Error class, no errno code, no special name), so no + // `errorKind` is stamped — only `error`. The next test covers the + // classified-error path. expect(JSON.parse(frames[1]!.data!).data).toEqual({ error: 'agent died' }); }); + it('stamps errorKind on stream_error when the thrown error is classified (#4175 F4 prereq, chiga0 #19 P0)', async () => { + // BridgeTimeoutError → `init_timeout` per mapDomainErrorToErrorKind. + // UI consumers can render "retry" on init_timeout vs "show stack + // trace" on unknown errors, without regex-matching the message + // string. + const { BridgeTimeoutError } = await import('@qwen-code/acp-bridge'); + const bridge = fakeBridge({ + async *subscribeImpl(_sessionId, _opts) { + yield { id: 1, v: 1, type: 'session_update', data: 'first' }; + // `BridgeTimeoutError(label, timeoutMs)` — 2 positional args + // (wenshao #4360 review). The resulting message is + // `"HttpAcpBridge initialize timed out after 5000ms"` which + // satisfies the `.toContain('timed out')` assertion below. + throw new BridgeTimeoutError('initialize', 5000); + }, + }); + handle = await runQwenServe( + { hostname: '127.0.0.1', port: 0, mode: 'http-bridge' }, + { bridge }, + ); + const port = (handle.server.address() as { port: number }).port; + const res = await fetch(`http://127.0.0.1:${port}/session/sess-A/events`); + const frames = await readSseFrames(res.body!, 2); + expect(frames[1]?.event).toBe('stream_error'); + const parsed = JSON.parse(frames[1]!.data!); + expect(parsed.data.errorKind).toBe('init_timeout'); + expect(parsed.data.error).toContain('timed out'); + }); + + it('writes a daemon-side stderr log on SSE ring eviction (#4360 wenshao observability fold-in)', async () => { + // The SSE write loop detects `state_resync_required` frames and + // emits a stderr breadcrumb so operators can grep daemon logs for + // ring-eviction events. Test covers: + // - the `writeStderrLine` actually fires + // - the `gap` arithmetic (earliestAvailableId - lastDeliveredId - 1) + // - all four data fields (lastEventId / earliestInRing / gap / reason) + // - the sessionId is included + const stderrSpy = vi.spyOn(process.stderr, 'write').mockReturnValue(true); + try { + const bridge = fakeBridge({ + async *subscribeImpl(_sessionId, _opts) { + yield { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }, + }; + await new Promise(() => {}); + }, + }); + handle = await runQwenServe( + { hostname: '127.0.0.1', port: 0, mode: 'http-bridge' }, + { bridge }, + ); + const port = (handle.server.address() as { port: number }).port; + const res = await fetch(`http://127.0.0.1:${port}/session/sess-A/events`); + await readSseFrames(res.body!, 1); + + const stderrLines = stderrSpy.mock.calls + .map((c) => String(c[0])) + .filter((s) => s.includes('SSE ring eviction detected')); + expect(stderrLines.length).toBeGreaterThanOrEqual(1); + const line = stderrLines[0]!; + expect(line).toContain('session sess-A'); + expect(line).toContain('lastEventId=5'); + expect(line).toContain('earliestInRing=12'); + // gap = 12 - 5 - 1 = 6 events + expect(line).toContain('gap=6 events'); + expect(line).toContain('reason=ring_evicted'); + expect(line).toContain('loadSession'); + } finally { + stderrSpy.mockRestore(); + } + }); + + it('falls back to "?" placeholders when state_resync_required data is partial', async () => { + // Defensive: the `?? '?'` fallback for missing fields lets the log + // line still print intelligibly when the daemon emits a partial + // payload (e.g. a future schema change drops one field). Pins the + // placeholder behavior so a regression that crashes the log call + // is caught. + const stderrSpy = vi.spyOn(process.stderr, 'write').mockReturnValue(true); + try { + const bridge = fakeBridge({ + async *subscribeImpl(_sessionId, _opts) { + yield { + v: 1, + type: 'state_resync_required', + // Intentionally missing all numeric fields + reason — + // exercises every `?? '?'` branch. + data: {} as unknown as { + reason: string; + lastDeliveredId: number; + earliestAvailableId: number; + }, + }; + await new Promise(() => {}); + }, + }); + handle = await runQwenServe( + { hostname: '127.0.0.1', port: 0, mode: 'http-bridge' }, + { bridge }, + ); + const port = (handle.server.address() as { port: number }).port; + await fetch(`http://127.0.0.1:${port}/session/sess-A/events`).then((r) => + readSseFrames(r.body!, 1), + ); + + const stderrLines = stderrSpy.mock.calls + .map((c) => String(c[0])) + .filter((s) => s.includes('SSE ring eviction detected')); + expect(stderrLines.length).toBeGreaterThanOrEqual(1); + const line = stderrLines[0]!; + // All four `?? '?'` branches print `?` for the missing values. + expect(line).toContain('lastEventId=?'); + expect(line).toContain('earliestInRing=?'); + expect(line).toContain('gap=? events'); + expect(line).toContain('reason=?'); + } finally { + stderrSpy.mockRestore(); + } + }); + + it('writes a daemon-side stderr log on bridge iterator error (#4360 wenshao observability fold-in)', async () => { + // The bridge-iterator-catch block in the SSE handler now emits a + // `writeStderrLine` BEFORE sending the `stream_error` SSE frame so + // operators can distinguish "subprocess OOM-killed" from "protocol + // bug" via `grep "bridge iterator error"`. Test covers: + // - the log fires with the error message + // - the sessionId is included + // - NO `[errorKind]` suffix for unclassified errors (plain Error) + const stderrSpy = vi.spyOn(process.stderr, 'write').mockReturnValue(true); + try { + const bridge = fakeBridge({ + async *subscribeImpl(_sessionId, _opts) { + yield { id: 1, v: 1, type: 'session_update', data: 'first' }; + throw new Error('agent died'); + }, + }); + handle = await runQwenServe( + { hostname: '127.0.0.1', port: 0, mode: 'http-bridge' }, + { bridge }, + ); + const port = (handle.server.address() as { port: number }).port; + await fetch(`http://127.0.0.1:${port}/session/sess-A/events`).then((r) => + readSseFrames(r.body!, 2), + ); + + const stderrLines = stderrSpy.mock.calls + .map((c) => String(c[0])) + .filter((s) => s.includes('bridge iterator error')); + expect(stderrLines.length).toBeGreaterThanOrEqual(1); + const line = stderrLines[0]!; + expect(line).toContain('session sess-A'); + expect(line).toContain('agent died'); + // Plain Error → mapDomainErrorToErrorKind returns undefined → + // suffix branch must NOT add `[...]`. + expect(line).not.toMatch(/\[.*?\]/); + } finally { + stderrSpy.mockRestore(); + } + }); + + it('includes [errorKind] suffix in bridge iterator error log when classified (#4360 wenshao observability fold-in)', async () => { + // BridgeTimeoutError → classified as `init_timeout`. The log line + // must include `[init_timeout]` so operators can `grep '\[init_'` + // for that specific failure class. + const { BridgeTimeoutError } = await import('@qwen-code/acp-bridge'); + const stderrSpy = vi.spyOn(process.stderr, 'write').mockReturnValue(true); + try { + const bridge = fakeBridge({ + async *subscribeImpl(_sessionId, _opts) { + yield { id: 1, v: 1, type: 'session_update', data: 'first' }; + throw new BridgeTimeoutError('initialize', 5000); + }, + }); + handle = await runQwenServe( + { hostname: '127.0.0.1', port: 0, mode: 'http-bridge' }, + { bridge }, + ); + const port = (handle.server.address() as { port: number }).port; + await fetch(`http://127.0.0.1:${port}/session/sess-A/events`).then((r) => + readSseFrames(r.body!, 2), + ); + + const stderrLines = stderrSpy.mock.calls + .map((c) => String(c[0])) + .filter((s) => s.includes('bridge iterator error')); + expect(stderrLines.length).toBeGreaterThanOrEqual(1); + const line = stderrLines[0]!; + expect(line).toContain('session sess-A'); + expect(line).toContain('timed out'); + expect(line).toContain('[init_timeout]'); + } finally { + stderrSpy.mockRestore(); + } + }); + it('forwards numeric Last-Event-ID even when supplied as a string', async () => { let seen: number | undefined; const bridge = fakeBridge({ diff --git a/packages/cli/src/serve/server.ts b/packages/cli/src/serve/server.ts index 4a2bb10616..a55eae9c79 100644 --- a/packages/cli/src/serve/server.ts +++ b/packages/cli/src/serve/server.ts @@ -27,6 +27,7 @@ import { type DeviceFlowProviderId, type DeviceFlowPublicView, } from './auth/deviceFlow.js'; +import { mapDomainErrorToErrorKind } from '@qwen-code/acp-bridge'; import { QwenOAuthDeviceFlowProvider } from './auth/qwenDeviceFlowProvider.js'; import { createBridgeFileSystemAdapter } from './bridgeFileSystemAdapter.js'; import { createDaemonStatusProvider } from './daemonStatusProvider.js'; @@ -1933,6 +1934,37 @@ export function createServeApp( const next = await iter!.next(); if (next.done) break; if (res.writableEnded) break; + // SSE ring eviction observability (#4360 wenshao review): + // EventBus.subscribe emits `state_resync_required` when a + // consumer reconnects with `Last-Event-ID` past the ring + // boundary. Without a daemon-side log, an operator chasing + // "my UI is frozen with stale state" has no breadcrumb to + // tell whether the ring is undersized, the client reconnects + // too slowly, or a network partition caused repeated + // reconnects. Log here (at the SSE write boundary) rather + // than inside EventBus — keeps the bus implementation pure + // and concentrates daemon-side observability in the route + // handler that already logs socket errors / heartbeats. + if (next.value.type === 'state_resync_required') { + const data = next.value.data as { + lastDeliveredId?: number; + earliestAvailableId?: number; + reason?: string; + }; + const gap = + typeof data.earliestAvailableId === 'number' && + typeof data.lastDeliveredId === 'number' + ? data.earliestAvailableId - data.lastDeliveredId - 1 + : undefined; + writeStderrLine( + `qwen serve: SSE ring eviction detected (session ${sessionId}): ` + + `lastEventId=${data.lastDeliveredId ?? '?'}, ` + + `earliestInRing=${data.earliestAvailableId ?? '?'}, ` + + `gap=${gap ?? '?'} events, ` + + `reason=${data.reason ?? '?'}. ` + + `Consumer must call loadSession to recover.`, + ); + } await writeWithBackpressure(formatSseFrame(next.value)); } } catch (err) { @@ -1943,11 +1975,37 @@ export function createServeApp( // hard-coded `id: 0` would regress the client's `Last-Event-ID` // tracker. `formatSseFrame` omits the `id:` line when the input // event has no id. + // + // `errorKind` (#4175 F4 prereq, chiga0 issue #19 P0): stamp the + // classified error kind so UIs can render typed responses + // (auth retry / file picker / proxy hint / etc.) rather than + // regex-matching the human-readable `error` string. Returns + // `undefined` for unclassified errors — SDK falls back to + // rendering `error` text as before, so adding `errorKind` is + // strictly additive / backward-compatible. + const errorKind = mapDomainErrorToErrorKind(err); + // Bridge iterator error observability (#4360 wenshao review): + // forwarding the error to the client via `stream_error` is + // good for UX but leaves the daemon side blind. The adjacent + // `res.on('error', ...)` handler at line ~1925 already logs + // socket-level errors with `writeStderrLine`; mirror that + // pattern here for protocol/subprocess errors so operators + // can grep daemon stderr for "bridge iterator error" instead + // of attaching a debugger to distinguish "subprocess + // OOM-killed" from "bridge protocol bug". + writeStderrLine( + `qwen serve: bridge iterator error (session ${sessionId}): ` + + `${errorMessage(err)}` + + (errorKind ? ` [${errorKind}]` : ''), + ); await writeWithBackpressure( formatSseFrame({ v: 1, type: 'stream_error', - data: { error: errorMessage(err) }, + data: { + error: errorMessage(err), + ...(errorKind ? { errorKind } : {}), + }, }), ).catch(() => {}); } @@ -2540,7 +2598,40 @@ function formatSseFrame(event: BridgeEvent | OmitId): string { // The SDK parser at `sdk-typescript/src/daemon/sse.ts` handles the // multi-line variant on the receive side — input/output asymmetry is // intentional. - const dataJson = JSON.stringify(event); + // + // `_meta.serverTimestamp` (#4175 F4 prereq, chiga0 issue #19 P0): stamp + // the daemon's wall-clock at SSE write time so multi-client transcript + // ordering and "X minutes ago" UIs use the server's clock rather than + // each client's drifting local clock. Stamped at the wire boundary + // (NOT at EventBus.publish) so the in-memory `BridgeEvent` type stays + // unchanged and internal consumers don't see `_meta`. + // + // **Where `_meta` lives**: this code merges with `event._meta` at the + // BridgeEvent top level. **No current producer in the daemon sets + // `_meta` at the top level** — wenshao #4360 review noted that + // ToolCallEmitter's `_meta` lives nested at `event.data._meta` (the + // ACP `session/update` payload's own metadata), and `bridgeClient.ts` + // publishes via `events.publish({ type: 'session_update', data: + // params })` so the emitter's `_meta` stays inside `data`. The + // top-level merge here is a **forward-compat escape hatch** — if a + // future emitter ever wants to stamp envelope-level metadata, it can + // do so via `_meta` and this spread preserves it. Today + // `existingMeta` is always `undefined` in production and the spread + // is a no-op merge with `{}`. + // + // SDK consumer plan: a 3-location probe (event.serverTimestamp / + // event._meta.serverTimestamp / event.data._meta.serverTimestamp) + // is planned in chiga0's separate PR #4353 (not yet merged into + // `daemon_mode_b_main`). When that ships, SDK readers find this + // field at the `_meta.serverTimestamp` location. Until then this + // stamp is daemon-side only; SDK consumers can read it through an + // `as any` cast or wait for #4353. + const existingMeta = (event as { _meta?: Record })._meta; + const stamped = { + ...event, + _meta: { ...(existingMeta ?? {}), serverTimestamp: Date.now() }, + }; + const dataJson = JSON.stringify(stamped); const idLine = 'id' in event && event.id !== undefined ? `id: ${event.id}\n` : ''; return `${idLine}event: ${event.type}\ndata: ${dataJson}\n\n`; diff --git a/packages/sdk-typescript/src/daemon/events.ts b/packages/sdk-typescript/src/daemon/events.ts index a3a2d7bf2c..409bbd639e 100644 --- a/packages/sdk-typescript/src/daemon/events.ts +++ b/packages/sdk-typescript/src/daemon/events.ts @@ -6,6 +6,7 @@ import type { DaemonEvent, + DaemonErrorKind, DaemonMcpTransport, PermissionOutcome, } from './types.js'; @@ -23,6 +24,18 @@ const DAEMON_KNOWN_EVENT_TYPE_VALUES = [ 'client_evicted', 'slow_client_warning', 'stream_error', + // #4175 F4 prereq (Ilya0527 issue #15) — terminal-style frame + // emitted by the daemon when an SSE consumer reconnects with a + // `Last-Event-ID` past the ring's earliest available id (the + // events between the consumer's last-seen id and the ring head + // were evicted before reconnect). The reducer treats this as + // "your accumulated state is stale; call `loadSession` and + // reseed view state before applying any further deltas". It does + // NOT close the stream — the daemon continues replaying surviving + // ring frames and live frames, but the reducer auto-skips them + // until the consumer reseeds state. Synthetic (no `id`) so it + // doesn't burn a slot in the per-session monotonic sequence. + 'state_resync_required', // PR 14b — MCP guardrail push events. See `mcp_guardrail_events` // capability tag. Both fire on the per-session SSE bus; consumers // should pre-flight `caps.features.includes('mcp_guardrail_events')` @@ -218,6 +231,51 @@ export interface DaemonSlowClientWarningData { export interface DaemonStreamErrorData { error: string; + /** + * #4175 F4 prereq (chiga0 issue #19 P0). Classified error kind from + * the daemon's `mapDomainErrorToErrorKind` — typed as the closed + * `DaemonErrorKind` enum (currently 8 values: `missing_binary` / + * `blocked_egress` / `auth_env_error` / `init_timeout` / + * `protocol_error` / `missing_file` / `parse_error` / + * `budget_exhausted`) with a `(string & {})` widening for forward- + * compat. A future daemon may emit additional kinds (e.g. + * `stat_failed` already exists on the daemon's `SERVE_ERROR_KINDS` + * but is not yet mirrored on this SDK constant) — the union shape + * preserves IDE autocomplete on the known values while accepting + * forward-compat strings without a type error. Absent for + * unclassified errors — the daemon omits the field rather than + * stamping a meaningless value. UI consumers key on this for typed + * retry / remediation rendering (retry on init_timeout vs install + * on missing_binary, etc.) instead of regex-matching the `error` + * string. + */ + errorKind?: DaemonErrorKind | (string & {}); + [key: string]: unknown; +} + +/** + * #4175 F4 prereq (Ilya0527 issue #15). Payload for the + * `state_resync_required` synthetic frame the daemon emits when an + * SSE consumer reconnects with a `Last-Event-ID` past the ring's + * earliest available id. The reducer auto-skips subsequent delta + * frames until consumer code calls `loadSession` and reseeds view + * state — see `DaemonSessionViewState.awaitingResync`. + */ +export interface DaemonStateResyncRequiredData { + /** + * Machine-readable resync reason. Currently always `'ring_evicted'` + * (the only case the daemon emits this frame for); reserved for + * future causes (e.g. `'schema_version_bump'`). + */ + reason: string; + /** Consumer's `Last-Event-ID` at reconnect time. */ + lastDeliveredId: number; + /** + * The earliest event id still in the daemon's per-session ring at + * reconnect time. The gap is `[lastDeliveredId + 1, + * earliestAvailableId - 1]` inclusive. + */ + earliestAvailableId: number; [key: string]: unknown; } @@ -525,6 +583,10 @@ export type DaemonStreamErrorEvent = DaemonEventEnvelope< 'stream_error', DaemonStreamErrorData >; +export type DaemonStateResyncRequiredEvent = DaemonEventEnvelope< + 'state_resync_required', + DaemonStateResyncRequiredData +>; export type DaemonMcpBudgetWarningEvent = DaemonEventEnvelope< 'mcp_budget_warning', DaemonMcpBudgetWarningData @@ -613,7 +675,8 @@ export type DaemonControlEvent = export type DaemonStreamLifecycleEvent = | DaemonClientEvictedEvent | DaemonSlowClientWarningEvent - | DaemonStreamErrorEvent; + | DaemonStreamErrorEvent + | DaemonStateResyncRequiredEvent; /** * PR 14b: MCP guardrail push events. Grouped as their own union member @@ -763,6 +826,38 @@ export interface DaemonSessionViewState { * `forbiddenVotes`). */ forbiddenVoteCount: number; + /** + * #4175 F4 prereq (Ilya0527 issue #15). Set to true when the + * reducer observes a `state_resync_required` frame from the daemon + * (consumer reconnected with `Last-Event-ID` past the daemon's + * ring eviction point — events between last-delivered and ring- + * head were lost, so the accumulated view state is stale relative + * to the daemon's truth). + * + * While true, the reducer **auto-skips** all non-terminal delta + * events (still advances `lastEventId`) to prevent the consumer + * from rendering against a known-stale state. Terminal lifecycle + * events (`session_died` / `session_closed` / `client_evicted` / + * `stream_error`) still apply because they're critical end-of- + * stream signals that don't depend on prior state being current. + * + * Consumer recovery: when this is true, call `loadSession` to + * fetch the daemon's canonical session snapshot, then reconstruct + * view state via `createDaemonSessionViewState({...loaded state})`. + * The fresh state seed clears the flag implicitly (a new reducer + * instance starts fresh). + */ + awaitingResync: boolean; + /** + * Count of `state_resync_required` frames this stream has observed. + * Typically 0 (no resync) or 1 (single ring-eviction event); + * higher counts indicate the consumer is reconnecting repeatedly + * past the ring boundary, which is itself a debuggable signal + * (network instability or ring sizing wrong for the workload). + */ + resyncRequiredCount: number; + /** Most recent resync payload (reason + gap range). */ + lastResyncRequired?: DaemonStateResyncRequiredData; } /** @@ -777,6 +872,34 @@ export interface DaemonSessionViewState { */ const MAX_FORBIDDEN_VOTES_PER_SESSION = 32; +/** + * #4175 F4 prereq (Ilya0527 issue #15). Event types that the reducer + * still processes when `awaitingResync` is true. Two categories: + * + * - **`state_resync_required` itself** — so the reducer can update + * `lastResyncRequired` / `resyncRequiredCount` for *subsequent* + * resync frames (rare but possible: a consumer that reconnects + * past the ring twice in succession). + * - **Terminal lifecycle frames** — `session_died` / `session_closed` + * / `client_evicted` / `stream_error`. Critical end-of-stream + * signals that don't depend on prior state being current. UIs + * must still see "this session died" even if they were in resync + * limbo at the time. + * + * Everything else (session_update / permission_* / approval_mode_changed + * / workspace mutations / mcp guardrail / auth flow events) is auto- + * skipped while `awaitingResync` is true; `lastEventId` still advances + * via `advanceLastEventId(base)` so the resync recovery sequence stays + * monotonic. + */ +const RESYNC_PASSTHROUGH_TYPES = new Set([ + 'state_resync_required', + 'session_died', + 'session_closed', + 'client_evicted', + 'stream_error', +]); + export function createDaemonSessionViewState( seed: Partial = {}, ): DaemonSessionViewState { @@ -821,6 +944,14 @@ export function createDaemonSessionViewState( permissionVoteProgress: { ...seed.permissionVoteProgress }, forbiddenVotes: seed.forbiddenVotes ? [...seed.forbiddenVotes] : [], forbiddenVoteCount: seed.forbiddenVoteCount ?? 0, + // #4175 F4 prereq (Ilya0527 issue #15) — fresh view state always + // starts without a resync requirement. A consumer calling + // `createDaemonSessionViewState` after `loadSession` to recover + // from an earlier resync implicitly clears the flag through this + // default. + awaitingResync: seed.awaitingResync ?? false, + resyncRequiredCount: seed.resyncRequiredCount ?? 0, + lastResyncRequired: seed.lastResyncRequired, }; } @@ -898,6 +1029,10 @@ export function asKnownDaemonEvent( return isStreamErrorData(event.data) ? (event as DaemonStreamErrorEvent) : undefined; + case 'state_resync_required': + return isStateResyncRequiredData(event.data) + ? (event as DaemonStateResyncRequiredEvent) + : undefined; case 'mcp_budget_warning': return isMcpBudgetWarningData(event.data) ? (event as DaemonMcpBudgetWarningEvent) @@ -974,6 +1109,20 @@ export function reduceDaemonSessionEvent( }; } + // #4175 F4 prereq (Ilya0527 issue #15). When `awaitingResync` is + // set, the consumer's accumulated view state is known stale — + // the daemon's ring evicted events between the consumer's last + // delivered id and reconnect. Auto-skip non-terminal delta events + // (still advance `lastEventId` via `base`) so the consumer doesn't + // render against stale state. Terminal lifecycle events still + // apply — they're critical end-of-stream signals that don't + // depend on prior state. The flag clears when the consumer calls + // `loadSession` and reconstructs view state via + // `createDaemonSessionViewState`. + if (base.awaitingResync && !RESYNC_PASSTHROUGH_TYPES.has(event.type)) { + return base; + } + switch (event.type) { case 'session_update': return { @@ -1169,6 +1318,24 @@ export function reduceDaemonSessionEvent( forbiddenVotes: [], forbiddenVoteCount: 0, }; + case 'state_resync_required': + // #4175 F4 prereq (Ilya0527 issue #15). Mark the accumulated + // view state as stale; subsequent non-terminal deltas are + // auto-skipped at the top-of-reducer gate above until consumer + // recovery via `loadSession` + `createDaemonSessionViewState`. + // `alive` and `terminalEvent` are NOT touched — the stream is + // still healthy; only the consumer's local accumulation is + // suspect. `pendingPermissions` is intentionally preserved + // (cleared by `loadSession`-driven recovery, not by the + // resync signal itself) so we don't synthesize a no-op + // "permission no longer pending" state transition while the + // consumer is still figuring out what's real. + return { + ...base, + awaitingResync: true, + resyncRequiredCount: base.resyncRequiredCount + 1, + lastResyncRequired: event.data, + }; case 'mcp_budget_warning': // Non-terminal: budget pressure is a status signal, not a stream // close. Count + capture latest so adapters can render @@ -1682,6 +1849,17 @@ function isClientEvictedData(value: unknown): value is DaemonClientEvictedData { ); } +function isStateResyncRequiredData( + value: unknown, +): value is DaemonStateResyncRequiredData { + return ( + isRecord(value) && + isNonEmptyString(value['reason']) && + isFiniteNumber(value['lastDeliveredId']) && + isFiniteNumber(value['earliestAvailableId']) + ); +} + function isSlowClientWarningData( value: unknown, ): value is DaemonSlowClientWarningData { diff --git a/packages/sdk-typescript/src/daemon/index.ts b/packages/sdk-typescript/src/daemon/index.ts index 51d2818978..bbfd6099a7 100644 --- a/packages/sdk-typescript/src/daemon/index.ts +++ b/packages/sdk-typescript/src/daemon/index.ts @@ -51,6 +51,10 @@ export type { DaemonClientEvictedData, DaemonClientEvictedEvent, DaemonControlEvent, + // #4175 F4 prereq (Ilya0527 issue #15) — daemon-emitted resync + // signal for SSE reconnects past the ring eviction boundary. + DaemonStateResyncRequiredData, + DaemonStateResyncRequiredEvent, DaemonMcpServerRestartedData, DaemonMcpServerRestartedEvent, DaemonMcpServerRestartRefusedData, diff --git a/packages/sdk-typescript/src/index.ts b/packages/sdk-typescript/src/index.ts index 58f32f086f..40049a0c90 100644 --- a/packages/sdk-typescript/src/index.ts +++ b/packages/sdk-typescript/src/index.ts @@ -44,6 +44,10 @@ export { type DaemonErrorKind, type DaemonClientEvictedData, type DaemonClientEvictedEvent, + // #4175 F4 prereq (Ilya0527 issue #15) — daemon-emitted resync + // signal for SSE reconnects past the ring eviction boundary. + type DaemonStateResyncRequiredData, + type DaemonStateResyncRequiredEvent, type DaemonClientOptions, type DaemonContentHash, type DaemonControlEvent, diff --git a/packages/sdk-typescript/test/unit/daemonEvents.test.ts b/packages/sdk-typescript/test/unit/daemonEvents.test.ts index fce4bc668f..839aab4f3f 100644 --- a/packages/sdk-typescript/test/unit/daemonEvents.test.ts +++ b/packages/sdk-typescript/test/unit/daemonEvents.test.ts @@ -2192,4 +2192,290 @@ describe('PR 21 — auth device-flow events', () => { expect(state.lastEventId).toBe(99); }); }); + + describe('state_resync_required (#4175 F4 prereq, Ilya0527 issue #15)', () => { + it('sets awaitingResync + records the resync data when daemon emits state_resync_required', () => { + const state = reduceDaemonSessionEvent(createDaemonSessionViewState(), { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }, + }); + expect(state.awaitingResync).toBe(true); + expect(state.resyncRequiredCount).toBe(1); + expect(state.lastResyncRequired).toEqual({ + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }); + }); + + it('auto-skips delta events (session_update) while awaitingResync is true', () => { + // Step 1: daemon emits resync → flag set. + const afterResync = reduceDaemonSessionEvent( + createDaemonSessionViewState(), + { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }, + }, + ); + // Step 2: subsequent session_update would normally set + // `lastSessionUpdate` — but awaitingResync auto-skips it. + const skipped = reduceDaemonSessionEvent(afterResync, { + id: 13, + v: 1, + type: 'session_update', + data: { sessionId: 's-X', phase: 'prompting' }, + }); + // lastSessionUpdate unchanged (skipped), lastEventId DID advance. + expect(skipped.lastSessionUpdate).toBeUndefined(); + expect(skipped.lastEventId).toBe(13); + expect(skipped.awaitingResync).toBe(true); + }); + + it('auto-skips permission_request while awaitingResync (no pendingPermissions mutation)', () => { + const afterResync = reduceDaemonSessionEvent( + createDaemonSessionViewState(), + { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }, + }, + ); + const skipped = reduceDaemonSessionEvent(afterResync, { + id: 13, + v: 1, + type: 'permission_request', + data: { + requestId: 'req-stale', + sessionId: 's-1', + toolCall: { + toolCallId: 'tc-1', + status: 'pending', + title: 'Read /etc/passwd', + }, + options: [ + { + optionId: 'allow_once', + name: 'Allow once', + kind: 'allow_once', + }, + ], + }, + }); + // pendingPermissions stays empty — the permission_request was + // applied to stale state and we can't trust which permissions + // are still pending until loadSession recovery. + expect(skipped.pendingPermissions).toEqual({}); + }); + + it('still applies terminal events (session_died) while awaitingResync', () => { + // Critical: a session that DIES while in resync limbo must still + // be observable as dead. Otherwise UIs would render "loading + // resync state…" indefinitely while the underlying session is + // gone. + const afterResync = reduceDaemonSessionEvent( + createDaemonSessionViewState(), + { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }, + }, + ); + const dead = reduceDaemonSessionEvent(afterResync, { + id: 14, + v: 1, + type: 'session_died', + data: { sessionId: 's-1', reason: 'channel_closed' }, + }); + expect(dead.alive).toBe(false); + expect(dead.terminalEvent?.type).toBe('session_died'); + // awaitingResync stays set (the consumer never recovered from + // resync — the session just died first). The terminal event + // takes precedence for UI rendering. + expect(dead.awaitingResync).toBe(true); + }); + + it('still applies stream_error while awaitingResync', () => { + const afterResync = reduceDaemonSessionEvent( + createDaemonSessionViewState(), + { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }, + }, + ); + const errored = reduceDaemonSessionEvent(afterResync, { + v: 1, + type: 'stream_error', + data: { error: 'transport gone' }, + }); + expect(errored.alive).toBe(false); + expect(errored.streamError).toEqual({ error: 'transport gone' }); + }); + + it('captures errorKind on stream_error in view state (wenshao #4360 review)', () => { + // The daemon stamps `errorKind` on `stream_error` payloads when + // classifiable (commit `14637cd79`, via `mapDomainErrorToErrorKind`). + // SDK consumers receiving these frames need `state.streamError. + // errorKind` to render typed retry/remediation UI (e.g. retry on + // init_timeout, install on missing_binary) without regex-matching + // the `error` message string. + // + // This test pins the flowthrough: the reducer's `stream_error` + // case must assign `event.data` as-is to `state.streamError`, + // preserving all fields including the optional `errorKind`. If + // a future refactor strips `errorKind` (e.g. by spreading only + // `{error}` instead of the full data object), this fails. + const state = reduceDaemonSessionEvent(createDaemonSessionViewState(), { + v: 1, + type: 'stream_error', + data: { + error: 'initialize timed out after 5000ms', + errorKind: 'init_timeout', + }, + }); + expect(state.alive).toBe(false); + expect(state.streamError?.errorKind).toBe('init_timeout'); + expect(state.streamError?.error).toContain('timed out'); + }); + + it('still applies session_closed while awaitingResync (wenshao #4360 review)', () => { + // session_closed is in RESYNC_PASSTHROUGH_TYPES alongside + // session_died — terminal session lifecycle signals must still + // surface even when the consumer is in resync limbo. Otherwise + // a session that closes during resync would silently keep + // `alive: true` in view state and the UI would render "loading + // resync state…" indefinitely. + const afterResync = reduceDaemonSessionEvent( + createDaemonSessionViewState(), + { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }, + }, + ); + const closed = reduceDaemonSessionEvent(afterResync, { + id: 15, + v: 1, + type: 'session_closed', + data: { sessionId: 's-1', reason: 'client_initiated' }, + }); + expect(closed.alive).toBe(false); + expect(closed.terminalEvent?.type).toBe('session_closed'); + // awaitingResync stays set (consumer never recovered) — the + // terminal event takes precedence for UI rendering but the + // resync flag remains as observability state. + expect(closed.awaitingResync).toBe(true); + }); + + it('still applies client_evicted while awaitingResync (wenshao #4360 review)', () => { + // client_evicted is the 5th member of RESYNC_PASSTHROUGH_TYPES. + // It happens when the subscriber's queue overflows (the daemon + // closes the stream after force-pushing the synthetic frame). + // Even in resync limbo, the SDK must see the eviction so the + // adapter can stop pretending the stream is alive. + const afterResync = reduceDaemonSessionEvent( + createDaemonSessionViewState(), + { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }, + }, + ); + const evicted = reduceDaemonSessionEvent(afterResync, { + v: 1, + type: 'client_evicted', + data: { reason: 'queue_overflow', droppedAfter: 17 }, + }); + expect(evicted.alive).toBe(false); + expect(evicted.terminalEvent?.type).toBe('client_evicted'); + }); + + it('reseeding view state via createDaemonSessionViewState clears awaitingResync (consumer recovery)', () => { + // Consumer recovery path: after observing awaitingResync, call + // loadSession (out of band) and reconstruct view state. The + // fresh state has the flag back to false. + const stale = reduceDaemonSessionEvent(createDaemonSessionViewState(), { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }, + }); + expect(stale.awaitingResync).toBe(true); + // Consumer calls loadSession + builds fresh state from result. + const recovered = createDaemonSessionViewState({ + sessionId: 's-1', + lastEventId: 20, + }); + expect(recovered.awaitingResync).toBe(false); + expect(recovered.resyncRequiredCount).toBe(0); + }); + + it('a second state_resync_required increments resyncRequiredCount', () => { + // Repeated reconnect past the ring boundary — counter accumulates. + let state = createDaemonSessionViewState(); + state = reduceDaemonSessionEvent(state, { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 5, + earliestAvailableId: 12, + }, + }); + state = reduceDaemonSessionEvent(state, { + v: 1, + type: 'state_resync_required', + data: { + reason: 'ring_evicted', + lastDeliveredId: 20, + earliestAvailableId: 100, + }, + }); + expect(state.resyncRequiredCount).toBe(2); + expect(state.lastResyncRequired?.lastDeliveredId).toBe(20); + }); + + it('rejects malformed state_resync_required payload via unrecognizedKnownEventCount', () => { + const state = reduceDaemonSessionEvent(createDaemonSessionViewState(), { + v: 1, + type: 'state_resync_required', + data: { reason: 'ring_evicted' }, // missing lastDeliveredId/earliestAvailableId + }); + expect(state.unrecognizedKnownEventCount).toBe(1); + expect(state.awaitingResync).toBe(false); + }); + }); });