diff --git a/packages/acp-bridge/src/bridge.test.ts b/packages/acp-bridge/src/bridge.test.ts index 4388510e58..4fed850827 100644 --- a/packages/acp-bridge/src/bridge.test.ts +++ b/packages/acp-bridge/src/bridge.test.ts @@ -2029,6 +2029,309 @@ describe('createHttpAcpBridge', () => { await bridge.shutdown(); }); + it('echoes user_message_chunk to ALL session subscribers (cross-client sync)', async () => { + // Cross-client sync fix: a prompt sent by client A must be visible + // to every SSE subscriber of the same session — not just the + // originator. Before the fix, the interactive prompt path forwarded + // straight to the agent without publishing `user_message_chunk` to + // the bus, so peer clients (B, C, ...) never saw A's input. + const factory: ChannelFactory = async () => + makeChannel({ promptImpl: () => ({ stopReason: 'end_turn' }) }).channel; + const bridge = makeBridge({ channelFactory: factory }); + const session = await bridge.spawnOrAttach({ workspaceCwd: WS_A }); + + const abortA = new AbortController(); + const abortB = new AbortController(); + const iterA = bridge.subscribeEvents(session.sessionId, { + signal: abortA.signal, + }); + const iterB = bridge.subscribeEvents(session.sessionId, { + signal: abortB.signal, + }); + + // Collect the first user_message_chunk each subscriber sees. + const firstUserChunk = async ( + iter: AsyncIterable<{ + type: string; + data: unknown; + originatorClientId?: string; + }>, + ): Promise<{ originatorClientId?: string; data: unknown }> => { + for await (const e of iter) { + if (e.type !== 'session_update') continue; + const update = (e.data as { update?: { sessionUpdate?: string } }) + ?.update; + if (update?.sessionUpdate === 'user_message_chunk') { + return { originatorClientId: e.originatorClientId, data: e.data }; + } + } + throw new Error('no user_message_chunk observed'); + }; + + const aPromise = firstUserChunk(iterA); + const bPromise = firstUserChunk(iterB); + + // Client A sends the prompt with its trusted clientId. + await bridge.sendPrompt( + session.sessionId, + { + sessionId: session.sessionId, + prompt: [{ type: 'text', text: 'hello from A' }], + }, + undefined, + { clientId: session.clientId }, + ); + + const [aChunk, bChunk] = await Promise.all([aPromise, bPromise]); + + // Both subscribers saw the user input echoed to the bus. + for (const chunk of [aChunk, bChunk]) { + const update = ( + chunk.data as { + update: { + sessionUpdate: string; + content: unknown; + _meta?: unknown; + }; + } + ).update; + expect(update.sessionUpdate).toBe('user_message_chunk'); + expect(update.content).toEqual({ type: 'text', text: 'hello from A' }); + // Originator stamp present so SDK `suppressOwnUserEcho` can dedup + // on the originator's own UI. + expect(chunk.originatorClientId).toBe(session.clientId); + // Source marker distinguishes the bridge echo from agent content. + expect((update._meta as { source?: string })?.source).toBe( + 'bridge-echo', + ); + } + + abortA.abort(); + abortB.abort(); + await bridge.shutdown(); + }); + + it('echoes one user_message_chunk per content block (multi-modal)', async () => { + const factory: ChannelFactory = async () => + makeChannel({ promptImpl: () => ({ stopReason: 'end_turn' }) }).channel; + const bridge = makeBridge({ channelFactory: factory }); + const session = await bridge.spawnOrAttach({ workspaceCwd: WS_A }); + + const abort = new AbortController(); + const iter = bridge.subscribeEvents(session.sessionId, { + signal: abort.signal, + }); + + const collected: Array<{ sessionUpdate: string; content: unknown }> = []; + const drain = (async () => { + for await (const e of iter) { + if (e.type !== 'session_update') continue; + const update = ( + e.data as { update?: { sessionUpdate?: string; content?: unknown } } + )?.update; + if (update?.sessionUpdate === 'user_message_chunk') { + collected.push({ + sessionUpdate: update.sessionUpdate, + content: update.content, + }); + if (collected.length === 2) break; + } + } + })(); + + await bridge.sendPrompt( + session.sessionId, + { + sessionId: session.sessionId, + prompt: [ + { type: 'text', text: 'describe this' }, + { type: 'resource_link', uri: 'file:///x.png', name: 'x.png' }, + ], + }, + undefined, + { clientId: session.clientId }, + ); + + await drain; + // One echo frame per content block, in order. + expect(collected).toHaveLength(2); + expect(collected[0]?.content).toEqual({ + type: 'text', + text: 'describe this', + }); + expect(collected[1]?.content).toMatchObject({ type: 'resource_link' }); + + abort.abort(); + await bridge.shutdown(); + }); + + it('broadcasts prompt_cancelled with originator attribution on cancelSession', async () => { + // Cross-client sync: a cancel must surface as a first-class event + // so peer subscribers don't have to infer it from the absence of + // further agent chunks. + const factory: ChannelFactory = async () => + makeChannel({ promptImpl: () => ({ stopReason: 'end_turn' }) }).channel; + const bridge = makeBridge({ channelFactory: factory }); + const session = await bridge.spawnOrAttach({ workspaceCwd: WS_A }); + + const abort = new AbortController(); + const iter = bridge.subscribeEvents(session.sessionId, { + signal: abort.signal, + }); + const firstCancel = (async () => { + for await (const e of iter) { + if (e.type === 'prompt_cancelled') return e; + } + throw new Error('no prompt_cancelled observed'); + })(); + + await bridge.cancelSession(session.sessionId, undefined, { + clientId: session.clientId, + }); + + const evt = await firstCancel; + expect(evt.type).toBe('prompt_cancelled'); + expect((evt.data as { sessionId: string }).sessionId).toBe( + session.sessionId, + ); + expect(evt.originatorClientId).toBe(session.clientId); + + abort.abort(); + await bridge.shutdown(); + }); + + it('broadcasts prompt_cancelled to peers when the originator SSE aborts mid-prompt', async () => { + // Cross-client sync: client disconnect (tab close / network drop / + // laptop sleep) is the most common cancel trigger in production. + // The `sendPrompt` `onAbort` path must publish `prompt_cancelled` + // to peer subscribers — not just the explicit `cancelSession` + // route. A regression here would silently re-open the gap. + let releasePrompt: (() => void) | undefined; + const factory: ChannelFactory = async () => + makeChannel({ + // Hang the prompt so it stays in-flight while we abort. + promptImpl: async () => { + await new Promise((res) => { + releasePrompt = res; + }); + return { stopReason: 'cancelled' }; + }, + }).channel; + const bridge = makeBridge({ channelFactory: factory }); + const session = await bridge.spawnOrAttach({ workspaceCwd: WS_A }); + + // Peer subscriber (a DIFFERENT client watching the same session). + const peerAbort = new AbortController(); + const peerIter = bridge.subscribeEvents(session.sessionId, { + signal: peerAbort.signal, + }); + const peerCancel = (async () => { + for await (const e of peerIter) { + if (e.type === 'prompt_cancelled') return e; + } + throw new Error('peer never saw prompt_cancelled'); + })(); + + // Originator sends the (hanging) prompt, then its SSE/HTTP signal + // aborts mid-flight (connection dropped). + const promptAbort = new AbortController(); + const promptPromise = bridge + .sendPrompt( + session.sessionId, + { + sessionId: session.sessionId, + prompt: [{ type: 'text', text: 'long running' }], + }, + promptAbort.signal, + { clientId: session.clientId }, + ) + .catch(() => { + // AbortError is expected — the originator's connection dropped. + }); + + // Give the queue worker a tick to start the prompt, then abort. + await new Promise((r) => setTimeout(r, 10)); + promptAbort.abort(); + + const evt = await peerCancel; + expect(evt.type).toBe('prompt_cancelled'); + expect((evt.data as { sessionId: string }).sessionId).toBe( + session.sessionId, + ); + // Attributed to the prompt's originator (whose connection dropped). + expect(evt.originatorClientId).toBe(session.clientId); + + // Let the hung promptImpl settle so shutdown doesn't wait on it. + releasePrompt?.(); + await promptPromise; + peerAbort.abort(); + await bridge.shutdown(); + }); + + it('stamps envelope originatorClientId on session_closed', async () => { + const factory: ChannelFactory = async () => makeChannel().channel; + const bridge = makeBridge({ channelFactory: factory }); + const session = await bridge.spawnOrAttach({ workspaceCwd: WS_A }); + + const abort = new AbortController(); + const iter = bridge.subscribeEvents(session.sessionId, { + signal: abort.signal, + }); + const firstClosed = (async () => { + for await (const e of iter) { + if (e.type === 'session_closed') return e; + } + throw new Error('no session_closed observed'); + })(); + + await bridge.closeSession(session.sessionId, { + clientId: session.clientId, + }); + + const evt = await firstClosed; + // Envelope-level stamp (new) — sibling events use this field. + expect(evt.originatorClientId).toBe(session.clientId); + // Back-compat `data.closedBy` retained. + expect((evt.data as { closedBy?: string }).closedBy).toBe( + session.clientId, + ); + + abort.abort(); + await bridge.shutdown(); + }); + + it('stamps envelope originatorClientId on session_metadata_updated', async () => { + const factory: ChannelFactory = async () => makeChannel().channel; + const bridge = makeBridge({ channelFactory: factory }); + const session = await bridge.spawnOrAttach({ workspaceCwd: WS_A }); + + const abort = new AbortController(); + const iter = bridge.subscribeEvents(session.sessionId, { + signal: abort.signal, + }); + const firstMeta = (async () => { + for await (const e of iter) { + if (e.type === 'session_metadata_updated') return e; + } + throw new Error('no session_metadata_updated observed'); + })(); + + bridge.updateSessionMetadata( + session.sessionId, + { displayName: 'renamed session' }, + { clientId: session.clientId }, + ); + + const evt = await firstMeta; + expect(evt.originatorClientId).toBe(session.clientId); + expect((evt.data as { displayName?: string }).displayName).toBe( + 'renamed session', + ); + + abort.abort(); + await bridge.shutdown(); + }); + it('overrides a stale sessionId in the body with the routing id', async () => { const handles: ChannelHandle[] = []; const factory: ChannelFactory = async () => { diff --git a/packages/acp-bridge/src/bridge.ts b/packages/acp-bridge/src/bridge.ts index e056645c89..7cdee67600 100644 --- a/packages/acp-bridge/src/bridge.ts +++ b/packages/acp-bridge/src/bridge.ts @@ -297,6 +297,109 @@ function writeServeDebugLine(message: string): void { const MAX_DISPLAY_NAME_LENGTH = 256; +/** + * Upper bound on how many prompt content blocks the bridge echoes per + * prompt. A programmatically-generated prompt with thousands of small + * blocks would otherwise trigger thousands of synchronous `publish()` + * fan-outs (each up to the per-bus subscriber cap) and flood the + * replay ring, evicting real history for every SSE subscriber. 256 is + * far above any human-authored prompt's block count. + */ +const MAX_ECHO_CONTENT_BLOCKS = 256; + +/** + * Echo a user prompt to the session bus so multi-client SSE subscribers + * see the input alongside the agent response. Iterates content blocks + * and emits one `user_message_chunk` per block, mirroring the shape the + * agent itself emits in the cron path (`Session.ts` cron handler) and + * the history-replay path (`HistoryReplayer`). The regular interactive + * `Session#executePrompt` was the historical outlier — it forwarded + * the prompt straight to the LLM without going through the session bus. + * + * Originator dedup: SDK consumers using `normalizeDaemonEvent` with + * `suppressOwnUserEcho: true` skip the echo for the originator (the + * envelope-level `originatorClientId` matches their own clientId). + * + * Source marker: `_meta.source: 'bridge-echo'` lets downstream tooling + * distinguish bridge-synthesized echoes from agent-emitted content if + * needed (e.g., for replay-deduplication when the agent later catches + * up and emits the same chunk through `HistoryReplayer`). + */ +function echoPromptToSessionBus( + entry: SessionEntry, + req: PromptRequest, + originatorClientId: string | undefined, +): void { + // `PromptRequest.prompt` is a non-optional `ContentBlock[]` per the + // ACP type contract — read it directly so a future SDK bump that + // makes it optional surfaces as a TypeScript error rather than being + // silently swallowed by an `unknown` cast. + const prompt = req.prompt; + if (prompt.length === 0) return; + const serverTimestamp = Date.now(); + const blockCount = Math.min(prompt.length, MAX_ECHO_CONTENT_BLOCKS); + for (let i = 0; i < blockCount; i += 1) { + const part = prompt[i]; + if (!part || typeof part !== 'object' || Array.isArray(part)) continue; + // Every `ContentBlock` variant (text, image, audio, resource) is + // published to the bus verbatim. The SDK's `normalizeDaemonEvent` + // accepts any `content` shape; rich rendering of non-text blocks is + // the consumer's responsibility. (Core's first-class multimodal + // user-content emit is tracked separately in PR #4353 §D — that + // affects the agent-side replay path, not this bridge echo.) + try { + entry.events.publish({ + type: 'session_update', + data: { + sessionId: req.sessionId, + update: { + sessionUpdate: 'user_message_chunk', + content: part, + _meta: { serverTimestamp, source: 'bridge-echo' }, + }, + }, + ...(originatorClientId ? { originatorClientId } : {}), + }); + } catch { + // bus may be closed (session being torn down); ignore — the + // prompt forward still proceeds. + } + } +} + +/** + * Publish a `prompt_cancelled` event to the session bus so peer SSE + * subscribers observe the cancel as a first-class event instead of + * inferring it from the absence of further `agent_message_chunk` + * frames. + * + * Semantic: this signals **cancel REQUESTED**, not **cancel + * confirmed** — it's published before the ACP `cancel` notification is + * forwarded/awaited (so peers learn promptly even if the agent is slow + * to wind down or the channel is dead). If a consumer needs hard + * confirmation it should observe the subsequent terminal + * `tool_call_update` / `agent_message_chunk` quiescence. + * + * `originatorClientId` identifies the cancelling client. Used by both + * the explicit `cancelSession` route and the `sendPrompt` abort path + * (originator SSE disconnect) so neither cancel route is a silent gap. + */ +function broadcastPromptCancelled( + entry: SessionEntry, + sessionId: string, + originatorClientId: string | undefined, +): void { + try { + entry.events.publish({ + type: 'prompt_cancelled', + data: { sessionId }, + ...(originatorClientId ? { originatorClientId } : {}), + }); + } catch { + /* bus closed */ + } +} + function hasControlCharacter(value: string): boolean { for (let i = 0; i < value.length; i += 1) { const code = value.charCodeAt(i); @@ -1902,6 +2005,26 @@ export function createHttpAcpBridge(opts: BridgeOptions): HttpAcpBridge { } else { entry.activePromptOriginatorClientId = originatorClientId; } + // Echo the user prompt to the session bus so other SSE-subscribed + // clients see the input alongside the agent response. + // + // The interactive prompt path was the only one not emitting + // `user_message_chunk` — `Session#executePrompt` (the agent + // side) forwards the prompt directly to the LLM; the cron path + // (Session.ts:1402) and `HistoryReplayer` (line 65) emit it + // explicitly. Without this echo, multi-client UIs only saw + // assistant text from peer prompts — no record of who said what. + // + // Originator dedup: SDK consumers' `normalizeDaemonEvent` with + // `suppressOwnUserEcho: true` filters the echo when + // `event.originatorClientId === opts.clientId`. So the + // originator's local UI doesn't double-render its own input. + // + // Multi-modal: one envelope per content block. Non-text blocks + // pass through verbatim (the agent's Core multimodal echo is a + // separate follow-up tracked in PR #4353 §D); for now the + // common text path is the immediate fix. + echoPromptToSessionBus(entry, normalized, originatorClientId); const promptPromise = entry.connection .prompt(normalized) .finally(() => { @@ -1945,6 +2068,15 @@ export function createHttpAcpBridge(opts: BridgeOptions): HttpAcpBridge { // forever (the agent is stuck waiting on a vote that no SSE // subscriber will ever cast). const onAbort = () => { + // Broadcast the cancel on the abort path too — client + // disconnect (SSE drop / tab close / laptop sleep) is the most + // common cancel trigger in production, and previously this path + // resolved permissions + forwarded ACP cancel WITHOUT telling + // peer SSE subscribers, leaving them in the exact + // silent-absence-of-chunks state this work set out to fix. + // `originatorClientId` here is the prompt's own originator (the + // client whose connection dropped). + broadcastPromptCancelled(entry, sessionId, originatorClientId); cancelPendingForSession(sessionId); entry.connection.cancel({ sessionId }).catch(() => { // Cancel is fire-and-forget; the agent may already be dead. @@ -1988,10 +2120,29 @@ export function createHttpAcpBridge(opts: BridgeOptions): HttpAcpBridge { async cancelSession(sessionId, req, context) { const entry = byId.get(sessionId); if (!entry) throw new SessionNotFoundError(sessionId); - resolveTrustedClientId(entry, context?.clientId); - // Validation-only: cancellation resolves permissions as system - // cancellations, so those generated events intentionally omit an - // originator client id. + const cancelOriginatorClientId = resolveTrustedClientId( + entry, + context?.clientId, + ); + // Broadcast `prompt_cancelled` so other SSE-subscribed clients see + // the cancel as a first-class event rather than inferring it from + // the absence of further `agent_message_chunk` frames. Mirrors + // `session_closed` — same audit gap (cross-client sync audit, + // 2026-05-24). Published before the ACP cancel forward (see the + // "cancel requested, not confirmed" semantic in + // `broadcastPromptCancelled`). + // + // Unconditional by design: not gated on `activePromptOriginatorClientId` + // because that field is only set when the active prompt carried an + // originator — gating on it would drop the broadcast for anonymous + // active prompts. A cancel against a genuinely idle session is a + // harmless no-op that consumers treat idempotently. + // + // The pending-permission resolution below intentionally omits the + // originator stamp (those resolutions are system-initiated, not + // user-voted); this top-level `prompt_cancelled` carries the + // cancelling client so peer UIs can attribute it. + broadcastPromptCancelled(entry, sessionId, cancelOriginatorClientId); // ACP spec: cancelling a prompt MUST resolve outstanding // requestPermission calls with outcome.cancelled. Do this *before* // forwarding the notification so the agent's wind-down sees the @@ -2261,8 +2412,13 @@ export function createHttpAcpBridge(opts: BridgeOptions): HttpAcpBridge { data: { sessionId, reason: 'client_close', + // `data.closedBy` is kept for back-compat with existing + // wire consumers; new code should read envelope-level + // `originatorClientId` (matches `session_metadata_updated`, + // `model_switched`, `approval_mode_changed`, etc.). ...(originatorClientId ? { closedBy: originatorClientId } : {}), }, + ...(originatorClientId ? { originatorClientId } : {}), }); } catch { /* bus already closed */ @@ -2289,9 +2445,16 @@ export function createHttpAcpBridge(opts: BridgeOptions): HttpAcpBridge { updateSessionMetadata(sessionId, metadata, context) { const entry = byId.get(sessionId); if (!entry) throw new SessionNotFoundError(sessionId); - if (context?.clientId !== undefined) { - resolveTrustedClientId(entry, context.clientId); - } + // Capture the trusted originator so the broadcast envelope can + // attribute the change to a specific client (parity with + // `model_switched`, `approval_mode_changed`, etc., which stamp + // envelope-level `originatorClientId`). Prior to this, the + // metadata broadcast had no originator stamp at all — UIs + // couldn't tell which client renamed the session. + const metadataOriginatorClientId = + context?.clientId !== undefined + ? resolveTrustedClientId(entry, context.clientId) + : undefined; if (metadata.displayName !== undefined) { if ( typeof metadata.displayName !== 'string' || @@ -2322,6 +2485,9 @@ export function createHttpAcpBridge(opts: BridgeOptions): HttpAcpBridge { entry.events.publish({ type: 'session_metadata_updated', data: { sessionId, displayName: entry.displayName }, + ...(metadataOriginatorClientId + ? { originatorClientId: metadataOriginatorClientId } + : {}), }); } catch { /* bus already closed */ diff --git a/packages/acp-bridge/src/eventBus.test.ts b/packages/acp-bridge/src/eventBus.test.ts index 271219fc20..1850108be5 100644 --- a/packages/acp-bridge/src/eventBus.test.ts +++ b/packages/acp-bridge/src/eventBus.test.ts @@ -65,7 +65,7 @@ describe('EventBus', () => { abort.abort(); }); - it('replay + live: new events follow the replay tail', async () => { + it('replay + live: new events follow the replay tail (with replay_complete sentinel)', async () => { const bus = new EventBus(); bus.publish({ type: 'foo', data: 'a' }); bus.publish({ type: 'foo', data: 'b' }); @@ -75,8 +75,24 @@ describe('EventBus', () => { setTimeout(() => bus.publish({ type: 'foo', data: 'c' }), 5); - const events = await collect(iter, 3); - expect(events.map((e) => e.data)).toEqual(['a', 'b', 'c']); + // The replay loop drains the ring, emits a `replay_complete` + // sentinel (id-less, lets consumers drop catch-up indicators), and + // then live events flow. Sentinel goes AFTER the ring tail so the + // consumer sees historical frames first, then the "you're live now" + // signal, then live events. + const events = await collect(iter, 4); + expect(events.map((e) => e.type)).toEqual([ + 'foo', + 'foo', + 'replay_complete', + 'foo', + ]); + expect(events.map((e) => e.data)).toEqual([ + 'a', + 'b', + expect.objectContaining({ lastEventId: 2, replayedCount: 2 }), + 'c', + ]); abort.abort(); }); @@ -415,12 +431,15 @@ describe('EventBus', () => { const events: BridgeEvent[] = []; for await (const e of iter) { events.push(e); - if (events.length === 11) break; + // 10 replay + 1 replay_complete sentinel + 1 live = 12 total + if (events.length === 12) break; } // The live frame must arrive — NOT a `client_evicted` terminal. expect(events.find((e) => e.type === 'client_evicted')).toBeUndefined(); expect(events.at(-1)?.type).toBe('live'); expect(events.filter((e) => e.type === 'replay')).toHaveLength(10); + // `replay_complete` sentinel signals end-of-replay before live frames. + expect(events.filter((e) => e.type === 'replay_complete')).toHaveLength(1); abort.abort(); }); @@ -490,15 +509,22 @@ describe('EventBus', () => { const out: BridgeEvent[] = []; for await (const e of iter) { out.push(e); - // After the state_resync_required frame (synthetic) + 3 replay - // frames, we're done. - if (out.length === 4) break; + // state_resync_required (synthetic) + 3 replay frames + + // replay_complete sentinel = 5 frames. + if (out.length === 5) break; } // First frame is the synthetic state_resync_required (no id). expect(out[0]?.type).toBe('state_resync_required'); expect(out[0]?.id).toBeUndefined(); // Then the 3 surviving ring frames. - expect(out.slice(1).map((e) => e.id)).toEqual([3, 4, 5]); + expect(out.slice(1, 4).map((e) => e.id)).toEqual([3, 4, 5]); + // The replay_complete sentinel fires at the end of replay even on + // the resync path — `replayedCount` is the actual frames pushed (3), + // NOT `earliestAvailableId - lastEventId` (which would over-count + // across the evicted hole). + expect(out[4]?.type).toBe('replay_complete'); + expect(out[4]?.id).toBeUndefined(); + expect(out[4]?.data).toMatchObject({ replayedCount: 3 }); abort.abort(); }); @@ -518,7 +544,8 @@ describe('EventBus', () => { const out: BridgeEvent[] = []; for await (const e of iter) { out.push(e); - if (out.length === 4) break; + // resync + 3 replay frames + replay_complete = 5. + if (out.length === 5) break; } // First frame is the resync terminal (synthetic, no id). expect(out[0]?.type).toBe('state_resync_required'); @@ -534,7 +561,12 @@ describe('EventBus', () => { // Replay continues after the resync frame (per design — SDK can // compute "what you missed" diff later) — so we still get the // 3 surviving ring frames. - expect(out.slice(1).map((e) => e.id)).toEqual([3, 4, 5]); + expect(out.slice(1, 4).map((e) => e.id)).toEqual([3, 4, 5]); + // replay_complete sentinel closes the replay even when a resync + // gap preceded it; replayedCount counts only the 3 surviving + // frames actually delivered (not the evicted hole). + expect(out[4]?.type).toBe('replay_complete'); + expect(out[4]?.data).toMatchObject({ replayedCount: 3 }); abort.abort(); }); @@ -574,16 +606,22 @@ describe('EventBus', () => { const out: BridgeEvent[] = []; for await (const e of iter) { out.push(e); - if (out.length === 3) break; + // 3 replay frames + 1 replay_complete sentinel = 4 total + if (out.length === 4) break; } expect(out.some((e) => e.type === 'state_resync_required')).toBe(false); - expect(out.map((e) => e.id)).toEqual([3, 4, 5]); + // Replay frames in order, then the sentinel (id-less, signals + // catch-up complete). + expect(out.filter((e) => e.type === 'foo').map((e) => e.id)).toEqual([ + 3, 4, 5, + ]); + expect(out.filter((e) => e.type === 'replay_complete')).toHaveLength(1); abort.abort(); }); it('does NOT emit state_resync_required when ring is empty', async () => { // No publishes yet → earliestInRing is undefined → resync check - // skipped. Subscriber just waits for live events. + // skipped. Subscriber waits for live events. const bus = new EventBus(10); const abort = new AbortController(); const iter = bus.subscribe({ @@ -595,11 +633,18 @@ describe('EventBus', () => { const out: BridgeEvent[] = []; for await (const e of iter) { out.push(e); - if (out.length === 1) break; + // The empty-ring case still emits `replay_complete` (zero + // frames replayed) so consumers always see the catch-up signal + // — then the one live event. 2 total. + if (out.length === 2) break; } - // No resync frame — just the one live event. - expect(out[0]?.type).toBe('foo'); - expect(out[0]?.id).toBe(1); + // No resync frame — but replay_complete (id-less sentinel) + + // the live event. + expect(out.some((e) => e.type === 'state_resync_required')).toBe(false); + expect(out[0]?.type).toBe('replay_complete'); + expect(out[0]?.data).toMatchObject({ replayedCount: 0 }); + expect(out[1]?.type).toBe('foo'); + expect(out[1]?.id).toBe(1); abort.abort(); }); diff --git a/packages/acp-bridge/src/eventBus.ts b/packages/acp-bridge/src/eventBus.ts index b55df438df..7916947d6a 100644 --- a/packages/acp-bridge/src/eventBus.ts +++ b/packages/acp-bridge/src/eventBus.ts @@ -407,6 +407,8 @@ export class EventBus { // `Last-Event-ID` resume contract (the consumer would think they // caught up). If the gap really is enormous, the queue will be // primed with a long backlog the consumer drains at its own pace. + let replayedCount = 0; + let lastReplayedId: number | undefined; for (const e of this.ring) { // The ring only ever contains live events (publish() always // assigns an id before pushing to ring), so `e.id` is never @@ -415,8 +417,38 @@ export class EventBus { // Guard explicitly to keep narrow typing without runtime cost. if (e.id !== undefined && e.id > opts.lastEventId) { queue.forcePush(e); + replayedCount += 1; + lastReplayedId = e.id; } } + // Emit a `replay_complete` sentinel so consumers can deterministically + // drop catch-up indicators. Fires both when replay actually + // delivered frames AND when there was nothing to replay (so the + // consumer always sees the transition from "catching up" to + // "live"). Synthetic frame — no `id` so it doesn't burn a slot in + // the per-session sequence (same pattern as `client_evicted` / + // `state_resync_required`). + // + // Without this sentinel, a consumer attaching via Last-Event-ID + // has no positive signal that replay drained — they have to + // heuristically time out the spinner. The state_resync_required + // path already has its own frame (above); the success path + // needed parity. + // + // `replayedCount` is the actual number of frames force-pushed, + // counted in the loop above — NOT `lastId - opts.lastEventId`, + // which would over-count when the ring has holes (state_resync + // path leaves a gap before the ring's earliest id). + queue.forcePush({ + v: EVENT_SCHEMA_VERSION, + type: 'replay_complete', + data: { + ...(lastReplayedId !== undefined + ? { lastEventId: lastReplayedId } + : {}), + replayedCount, + }, + }); } let disposed = false; diff --git a/packages/sdk-typescript/src/daemon/events.ts b/packages/sdk-typescript/src/daemon/events.ts index c357072c92..7814d8eb56 100644 --- a/packages/sdk-typescript/src/daemon/events.ts +++ b/packages/sdk-typescript/src/daemon/events.ts @@ -73,6 +73,20 @@ const DAEMON_KNOWN_EVENT_TYPE_VALUES = [ // daemons predating F3 omit both event types. 'permission_partial_vote', 'permission_forbidden', + // Cross-client real-time sync (acp-bridge audit, 2026-05-24). + // `prompt_cancelled`: broadcast when a prompt is cancelled (explicit + // `cancelSession` route OR originator SSE disconnect) so peer + // subscribers observe the cancel as a first-class event instead of + // inferring it from the absence of further `agent_message_chunk` + // frames. Carries envelope-level `originatorClientId` (cancelling + // client). Semantic is "cancel requested", not "confirmed". + // `replay_complete`: id-less sentinel emitted at the end of the + // `Last-Event-ID` replay loop so consumers can deterministically + // drop a catch-up indicator. Fires on both the clean-replay and the + // ring-evicted (`state_resync_required`) paths, and even when there + // was nothing to replay (`data.replayedCount === 0`). + 'prompt_cancelled', + 'replay_complete', ] as const; const DAEMON_KNOWN_EVENT_TYPES: ReadonlySet = new Set( diff --git a/packages/sdk-typescript/src/daemon/index.ts b/packages/sdk-typescript/src/daemon/index.ts index 3474b34f2f..8a446c3bfd 100644 --- a/packages/sdk-typescript/src/daemon/index.ts +++ b/packages/sdk-typescript/src/daemon/index.ts @@ -128,6 +128,8 @@ export type { DaemonUiSessionMetadataChangedEvent, DaemonUiShellOutputEvent, DaemonUiStateResyncRequiredEvent, + DaemonUiReplayCompleteEvent, + DaemonUiPromptCancelledEvent, DaemonUiStatusEvent, DaemonUiTextEvent, DaemonUiToolProvenance, diff --git a/packages/sdk-typescript/src/daemon/ui/index.ts b/packages/sdk-typescript/src/daemon/ui/index.ts index 4477d608a7..18cbf0db4e 100644 --- a/packages/sdk-typescript/src/daemon/ui/index.ts +++ b/packages/sdk-typescript/src/daemon/ui/index.ts @@ -90,6 +90,8 @@ export type { DaemonUiSessionApprovalModeChangedEvent, DaemonUiSessionAvailableCommandsEvent, DaemonUiStateResyncRequiredEvent, + DaemonUiReplayCompleteEvent, + DaemonUiPromptCancelledEvent, // Workspace events DaemonUiWorkspaceMemoryChangedEvent, DaemonUiWorkspaceAgentChangedEvent, diff --git a/packages/sdk-typescript/src/daemon/ui/normalizer.ts b/packages/sdk-typescript/src/daemon/ui/normalizer.ts index 86dd77c98f..a324782e42 100644 --- a/packages/sdk-typescript/src/daemon/ui/normalizer.ts +++ b/packages/sdk-typescript/src/daemon/ui/normalizer.ts @@ -155,6 +155,22 @@ export function normalizeDaemonEvent( case 'state_resync_required': return normalizeStateResyncRequired(event, base); + case 'prompt_cancelled': + return [{ ...base, type: 'prompt.cancelled' }]; + + case 'replay_complete': { + const replayedCount = numberField(event.data, 'replayedCount') ?? 0; + const lastReplayedEventId = numberField(event.data, 'lastEventId'); + return [ + { + ...base, + type: 'session.replay_complete', + replayedCount, + ...(lastReplayedEventId !== undefined ? { lastReplayedEventId } : {}), + }, + ]; + } + // ── Session-meta events ────────────────────────────────────────────── case 'session_metadata_updated': return normalizeSessionMetadataUpdated(event, base); diff --git a/packages/sdk-typescript/src/daemon/ui/terminal.ts b/packages/sdk-typescript/src/daemon/ui/terminal.ts index eb4212d86f..5501e7bc3f 100644 --- a/packages/sdk-typescript/src/daemon/ui/terminal.ts +++ b/packages/sdk-typescript/src/daemon/ui/terminal.ts @@ -69,6 +69,14 @@ export function daemonUiEventToTerminalText(event: DaemonUiEvent): string { `${event.reason}: ${formatMissedRange(event.lastDeliveredId, event.earliestAvailableId)}`, '31', ); + case 'session.replay_complete': + return terminalLine( + 'replay-complete', + `caught up (${event.replayedCount} replayed)`, + '2', + ); + case 'prompt.cancelled': + return terminalLine('cancelled', 'prompt cancelled', '33'); case 'workspace.memory.changed': return terminalLine( 'memory', diff --git a/packages/sdk-typescript/src/daemon/ui/transcript.ts b/packages/sdk-typescript/src/daemon/ui/transcript.ts index b7199a229d..6ceeee2689 100644 --- a/packages/sdk-typescript/src/daemon/ui/transcript.ts +++ b/packages/sdk-typescript/src/daemon/ui/transcript.ts @@ -150,9 +150,9 @@ function applyDaemonTranscriptEvent( // console.error) so it surfaces in DevTools but doesn't escalate as // an uncaught issue. Throttled at the call site is the consumer's // job — this fires once per dropped event. - if (typeof console !== 'undefined' && console.warn) { + if (typeof console !== 'undefined') { // eslint-disable-next-line no-console -- intentional diagnostic for awaitingResync silent-drop, per wenshao R5 - console.warn( + console.warn?.( `[daemon-ui] dropping event \`${event.type}\` while awaitingResync; ` + `state may be stale until session reconnect (lastResyncRequired: ${ next.lastResyncRequired @@ -250,6 +250,20 @@ function applyDaemonTranscriptEvent( case 'session.state_resync_required': handleStateResyncRequired(next, event); break; + case 'prompt.cancelled': + // Cross-client: a peer (or this client's own dropped connection) + // cancelled the active prompt. Clear in-flight tool spinners the + // same way an `assistant.done(cancelled)` would, so multi-client + // UIs don't show a tool spinning forever after a peer cancel. + // Idempotent — safe if the daemon also later emits terminal + // tool_call_update frames. + propagateCancellationToInFlightTools(next); + break; + case 'session.replay_complete': + // Sidechannel signal only — consumers read it off the event + // stream (or `selectors`) to drop a catch-up indicator. No + // transcript mutation. + break; case 'workspace.memory.changed': case 'workspace.agent.changed': case 'workspace.tool.toggled': @@ -444,8 +458,7 @@ function upsertToolBlock( // selectors fall back to `parentToolCallId` lookup in that case. const parentBlockId = event.parentToolCallId && - state.toolBlockByCallId[event.parentToolCallId] !== - TRIMMED_TOOL_BLOCK_ID + state.toolBlockByCallId[event.parentToolCallId] !== TRIMMED_TOOL_BLOCK_ID ? state.toolBlockByCallId[event.parentToolCallId] : undefined; const block: DaemonToolTranscriptBlock = { @@ -504,11 +517,7 @@ function upsertToolBlock( // returns and the block sits as visually-pending but currentToolCallId // never points at it. Effective-status keeps the pointer in sync // with what was actually written to the block. - updateCurrentToolPointer( - state, - event.toolCallId, - event.status ?? 'pending', - ); + updateCurrentToolPointer(state, event.toolCallId, event.status ?? 'pending'); clearActiveText(state); } @@ -1169,8 +1178,10 @@ export function selectSubagentChildBlocks( state: DaemonTranscriptState, parentToolCallId: string, ): readonly DaemonToolTranscriptBlock[] { - return getOrBuildChildrenIndex(state.blocks).get(parentToolCallId) ?? - EMPTY_CHILD_LIST; + return ( + getOrBuildChildrenIndex(state.blocks).get(parentToolCallId) ?? + EMPTY_CHILD_LIST + ); } /** diff --git a/packages/sdk-typescript/src/daemon/ui/types.ts b/packages/sdk-typescript/src/daemon/ui/types.ts index 5995493cae..17c50a80b3 100644 --- a/packages/sdk-typescript/src/daemon/ui/types.ts +++ b/packages/sdk-typescript/src/daemon/ui/types.ts @@ -33,6 +33,9 @@ export type DaemonUiEventType = | 'session.approval_mode.changed' | 'session.available_commands' | 'session.state_resync_required' + | 'session.replay_complete' + // Prompt lifecycle (cross-client) + | 'prompt.cancelled' // Workspace events (Wave 3-4) | 'workspace.memory.changed' | 'workspace.agent.changed' @@ -229,6 +232,37 @@ export interface DaemonUiStateResyncRequiredEvent extends DaemonUiEventBase { earliestAvailableId: number; } +/** + * A prompt on the session was cancelled — emitted by the daemon when a + * client calls the cancel route OR when a prompt's originator SSE + * connection drops mid-flight. Lets multi-client UIs surface "cancelled" + * as a first-class event instead of inferring it from the absence of + * further assistant chunks. + * + * Semantic: "cancel requested", not "cancel confirmed" — the daemon + * publishes this before the agent has necessarily wound down. The + * reducer treats it like an `assistant.done(cancelled)` for the purpose + * of clearing in-flight tool spinners. `originatorClientId` (on the + * base) identifies the cancelling client. + */ +export interface DaemonUiPromptCancelledEvent extends DaemonUiEventBase { + type: 'prompt.cancelled'; +} + +/** + * Sentinel signalling that the daemon has finished replaying buffered + * events after a `Last-Event-ID` resume — consumers can drop a + * catch-up indicator deterministically. Fires on both the clean-replay + * and ring-evicted paths, and even when nothing was replayed + * (`replayedCount === 0`). + */ +export interface DaemonUiReplayCompleteEvent extends DaemonUiEventBase { + type: 'session.replay_complete'; + replayedCount: number; + /** Highest event id delivered in the replay, when any frames replayed. */ + lastReplayedEventId?: number; +} + /* ────────────────────────────────────────────────────────────────────────── * Workspace events (Wave 3-4) * ──────────────────────────────────────────────────────────────────────── */ @@ -357,6 +391,9 @@ export type DaemonUiEvent = | DaemonUiSessionApprovalModeChangedEvent | DaemonUiSessionAvailableCommandsEvent | DaemonUiStateResyncRequiredEvent + | DaemonUiReplayCompleteEvent + // Prompt lifecycle (cross-client) + | DaemonUiPromptCancelledEvent // Workspace events | DaemonUiWorkspaceMemoryChangedEvent | DaemonUiWorkspaceAgentChangedEvent diff --git a/packages/sdk-typescript/test/unit/daemonUi.test.ts b/packages/sdk-typescript/test/unit/daemonUi.test.ts index e0effcb76e..bfe976f0ca 100644 --- a/packages/sdk-typescript/test/unit/daemonUi.test.ts +++ b/packages/sdk-typescript/test/unit/daemonUi.test.ts @@ -945,7 +945,9 @@ describe('daemon UI normalizer and transcript reducer', () => { const block = state.blocks[0]; expect(block).toMatchObject({ kind: 'tool', toolKind: 'updated_plan' }); if (block?.kind !== 'tool') throw new Error('expected plan tool block'); - const firstContent = (block.content as Array> | undefined)?.[0]; + const firstContent = ( + block.content as Array> | undefined + )?.[0]; expect(firstContent).toMatchObject({ content: { type: 'text', @@ -1198,8 +1200,7 @@ describe('daemon UI normalizer and transcript reducer', () => { if (originalReportError) { globalWithReportError.reportError = originalReportError; } else { - delete (globalWithReportError as { reportError?: unknown }) - .reportError; + delete (globalWithReportError as { reportError?: unknown }).reportError; } } }); @@ -3777,10 +3778,7 @@ describe('daemon UI subagent nesting — review hardening (R1-R4)', () => { const result = runAdapterConformanceSuite( { reduce: (events) => - reduceDaemonTranscriptEvents( - createDaemonTranscriptState(), - events, - ), + reduceDaemonTranscriptEvents(createDaemonTranscriptState(), events), renderToText: (state: DaemonTranscriptState) => state.blocks .map((b: DaemonTranscriptBlock) => daemonBlockToMarkdown(b)) @@ -4298,10 +4296,9 @@ describe('KNOWN_DEVICE_FLOW_ERROR_KINDS stays in sync with public type', async ( describe('daemonBlockToPlainText forwards opts (wenshao review 4350741340)', () => { it('sanitizes URL on tool preview when opts.sanitizeUrls is set', async () => { - const { - daemonBlockToPlainText, - createDaemonToolPreview, - } = await import('../../src/daemon/ui/index.js'); + const { daemonBlockToPlainText, createDaemonToolPreview } = await import( + '../../src/daemon/ui/index.js' + ); const block = { id: 'b', kind: 'tool' as const, @@ -4331,10 +4328,9 @@ describe('daemonBlockToHtml — additional coverage (wenshao R3 qwen3.7-max)', ( } as const; it('strips token query param + Basic Auth from web_fetch URL when sanitizeUrls:true', async () => { - const { - daemonBlockToHtml, - createDaemonToolPreview, - } = await import('../../src/daemon/ui/index.js'); + const { daemonBlockToHtml, createDaemonToolPreview } = await import( + '../../src/daemon/ui/index.js' + ); const block = { ...baseFields, kind: 'tool' as const, @@ -4356,10 +4352,9 @@ describe('daemonBlockToHtml — additional coverage (wenshao R3 qwen3.7-max)', ( }); it('protocol-validates thumbnailUrl even when sanitizeUrls:false', async () => { - const { - daemonBlockToMarkdown, - createDaemonToolPreview, - } = await import('../../src/daemon/ui/index.js'); + const { daemonBlockToMarkdown, createDaemonToolPreview } = await import( + '../../src/daemon/ui/index.js' + ); const block = { ...baseFields, kind: 'tool' as const, @@ -4562,35 +4557,31 @@ describe('Late permission.resolved after sentinel pruned (wenshao R3 qwen3.7-max describe('ensureSafeImageUrl tightened to data:image/* (audit follow-up)', () => { it('allows http/https/data:image/* but rejects data:text/html', async () => { - const { - daemonBlockToMarkdown, - createDaemonToolPreview, - } = await import('../../src/daemon/ui/index.js'); - const mkBlock = (thumbnailUrl: string) => - ({ - id: 'b', - kind: 'tool' as const, - toolCallId: 't', - title: 'gen', - status: 'completed', - preview: createDaemonToolPreview( - { prompt: 'p', thumbnailUrl }, - { toolName: 'image_generator', toolKind: 'tool' }, - ), - clientReceivedAt: 1, - createdAt: 1, - updatedAt: 1, - }); + const { daemonBlockToMarkdown, createDaemonToolPreview } = await import( + '../../src/daemon/ui/index.js' + ); + const mkBlock = (thumbnailUrl: string) => ({ + id: 'b', + kind: 'tool' as const, + toolCallId: 't', + title: 'gen', + status: 'completed', + preview: createDaemonToolPreview( + { prompt: 'p', thumbnailUrl }, + { toolName: 'image_generator', toolKind: 'tool' }, + ), + clientReceivedAt: 1, + createdAt: 1, + updatedAt: 1, + }); // https → passthrough - expect(daemonBlockToMarkdown(mkBlock('https://cdn.example.com/x.png'))).toContain( - 'cdn.example.com', - ); + expect( + daemonBlockToMarkdown(mkBlock('https://cdn.example.com/x.png')), + ).toContain('cdn.example.com'); // data:image/png → passthrough expect( - daemonBlockToMarkdown( - mkBlock('data:image/png;base64,iVBORw0KGgo='), - ), + daemonBlockToMarkdown(mkBlock('data:image/png;base64,iVBORw0KGgo=')), ).toContain('data:image/png'); // data:text/html → rejected to '#' expect( @@ -4626,16 +4617,17 @@ describe('R5 review batch — coverage additions', () => { id: 2, v: 1, type: 'auth_device_flow_cancelled', - data: { /* no deviceFlowId */ }, + data: { + /* no deviceFlowId */ + }, } as never); expect(events[0]?.type).toBe('debug'); }); it('sanitizeUrl clears OAuth implicit-grant access_token in #fragment', async () => { - const { - daemonBlockToMarkdown, - createDaemonToolPreview, - } = await import('../../src/daemon/ui/index.js'); + const { daemonBlockToMarkdown, createDaemonToolPreview } = await import( + '../../src/daemon/ui/index.js' + ); const block = { id: 'b', kind: 'tool' as const, @@ -4659,10 +4651,9 @@ describe('R5 review batch — coverage additions', () => { }); it('sanitizeUrl strips AWS / GCP / Azure SAS credential params', async () => { - const { - daemonBlockToMarkdown, - createDaemonToolPreview, - } = await import('../../src/daemon/ui/index.js'); + const { daemonBlockToMarkdown, createDaemonToolPreview } = await import( + '../../src/daemon/ui/index.js' + ); const mkBlock = (url: string) => ({ id: 'b', kind: 'tool' as const, @@ -4679,21 +4670,27 @@ describe('R5 review batch — coverage additions', () => { }); // AWS S3 presigned const aws = daemonBlockToMarkdown( - mkBlock('https://bucket.s3.amazonaws.com/x?AWSAccessKeyId=AKIA_LEAK&Expires=1234&Signature=SIG_LEAK'), + mkBlock( + 'https://bucket.s3.amazonaws.com/x?AWSAccessKeyId=AKIA_LEAK&Expires=1234&Signature=SIG_LEAK', + ), { sanitizeUrls: true }, ); expect(aws).not.toContain('AKIA_LEAK'); expect(aws).not.toContain('SIG_LEAK'); // GCP signed URL const gcp = daemonBlockToMarkdown( - mkBlock('https://storage.googleapis.com/b/o?GoogleAccessId=svc_LEAK@proj.iam.gserviceaccount.com&Expires=999&Signature=GCP_LEAK'), + mkBlock( + 'https://storage.googleapis.com/b/o?GoogleAccessId=svc_LEAK@proj.iam.gserviceaccount.com&Expires=999&Signature=GCP_LEAK', + ), { sanitizeUrls: true }, ); expect(gcp).not.toContain('svc_LEAK'); expect(gcp).not.toContain('GCP_LEAK'); // Azure SAS const az = daemonBlockToMarkdown( - mkBlock('https://acct.blob.core.windows.net/c/x?sv=2020-08-04&se=2026-12-31&sig=AZ_LEAK&sp=r'), + mkBlock( + 'https://acct.blob.core.windows.net/c/x?sv=2020-08-04&se=2026-12-31&sig=AZ_LEAK&sp=r', + ), { sanitizeUrls: true }, ); expect(az).not.toContain('AZ_LEAK'); @@ -4894,10 +4891,9 @@ describe('R6 review batch — recovery flow + pending pointer', () => { describe('R7 review batch — markdown escape + details sanitization', () => { it('escapeMarkdownText escapes < in metadata fields (titles/kinds) for HTML-backed pipelines', async () => { - const { - daemonBlockToMarkdown, - createDaemonToolPreview, - } = await import('../../src/daemon/ui/index.js'); + const { daemonBlockToMarkdown, createDaemonToolPreview } = await import( + '../../src/daemon/ui/index.js' + ); // `escapeMarkdownText` is applied to METADATA fields (title / // toolKind / status) — those are reviewer-untrusted and should // escape `<` to prevent raw HTML pass-through when consumers run @@ -4931,10 +4927,9 @@ describe('R7 review batch — markdown escape + details sanitization', () => { }); it('markdown tool block details strips URL credentials when sanitizeUrls:true', async () => { - const { - daemonBlockToMarkdown, - createDaemonToolPreview, - } = await import('../../src/daemon/ui/index.js'); + const { daemonBlockToMarkdown, createDaemonToolPreview } = await import( + '../../src/daemon/ui/index.js' + ); const block = { id: 'b', kind: 'tool' as const, @@ -4961,10 +4956,9 @@ describe('R7 review batch — markdown escape + details sanitization', () => { }); it('markdown tool block details preserves URLs verbatim when sanitizeUrls:false (back-compat)', async () => { - const { - daemonBlockToMarkdown, - createDaemonToolPreview, - } = await import('../../src/daemon/ui/index.js'); + const { daemonBlockToMarkdown, createDaemonToolPreview } = await import( + '../../src/daemon/ui/index.js' + ); const block = { id: 'b', kind: 'tool' as const, @@ -4975,8 +4969,7 @@ describe('R7 review batch — markdown escape + details sanitization', () => { { url: 'https://api.example.com/v1', method: 'GET' }, { toolName: 'WebFetch', toolKind: 'tool' }, ), - details: - '{\n "url": "https://api.example.com/v1?token=visible"\n}', + details: '{\n "url": "https://api.example.com/v1?token=visible"\n}', clientReceivedAt: 1, createdAt: 1, updatedAt: 1, @@ -4987,3 +4980,104 @@ describe('R7 review batch — markdown escape + details sanitization', () => { expect(md).toContain('token=visible'); }); }); + +describe('cross-client event recognition (prompt_cancelled / replay_complete)', () => { + it('normalizes prompt_cancelled to prompt.cancelled (not debug)', () => { + const events = normalizeDaemonEvent({ + id: 1, + v: 1, + type: 'prompt_cancelled', + originatorClientId: 'client-X', + data: { sessionId: 's1' }, + } as never); + expect(events).toEqual([ + expect.objectContaining({ + type: 'prompt.cancelled', + originatorClientId: 'client-X', + }), + ]); + }); + + it('normalizes replay_complete to session.replay_complete with count', () => { + const events = normalizeDaemonEvent({ + id: 1, + v: 1, + type: 'replay_complete', + data: { replayedCount: 3, lastEventId: 7 }, + } as never); + expect(events[0]).toMatchObject({ + type: 'session.replay_complete', + replayedCount: 3, + lastReplayedEventId: 7, + }); + }); + + it('replay_complete with zero replay (empty ring) normalizes cleanly', () => { + const events = normalizeDaemonEvent({ + id: 1, + v: 1, + type: 'replay_complete', + data: { replayedCount: 0 }, + } as never); + expect(events[0]).toMatchObject({ + type: 'session.replay_complete', + replayedCount: 0, + }); + expect(events[0]).not.toHaveProperty('lastReplayedEventId'); + }); + + it('prompt.cancelled clears in-flight tool spinners in the reducer', () => { + let state = createDaemonTranscriptState({ now: 1 }); + state = reduceDaemonTranscriptEvents( + state, + normalizeDaemonEvent({ + id: 1, + v: 1, + type: 'session_update', + data: { + update: { + sessionUpdate: 'tool_call', + toolCallId: 'running-tool', + title: 'long task', + status: 'running', + }, + }, + } as never), + { now: 2 }, + ); + // Peer cancel arrives. + state = reduceDaemonTranscriptEvents( + state, + normalizeDaemonEvent({ + id: 2, + v: 1, + type: 'prompt_cancelled', + originatorClientId: 'peer', + data: { sessionId: 's1' }, + } as never), + { now: 3 }, + ); + const block = state.blocks.find( + (b): b is Extract => + b.kind === 'tool' && b.toolCallId === 'running-tool', + )!; + expect(block.status).toBe('cancelled'); + expect(state.currentToolCallId).toBeUndefined(); + }); + + it('session.replay_complete is a no-op against blocks', () => { + let state = createDaemonTranscriptState({ now: 1 }); + const before = state.blocks.length; + state = reduceDaemonTranscriptEvents( + state, + normalizeDaemonEvent({ + id: 1, + v: 1, + type: 'replay_complete', + data: { replayedCount: 5, lastEventId: 5 }, + } as never), + { now: 2 }, + ); + expect(state.blocks.length).toBe(before); + }); +});