Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
303 changes: 303 additions & 0 deletions packages/acp-bridge/src/bridge.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2029,6 +2029,309 @@ describe('createHttpAcpBridge', () => {
await bridge.shutdown();
});

it('echoes user_message_chunk to ALL session subscribers (cross-client sync)', async () => {
// Cross-client sync fix: a prompt sent by client A must be visible
// to every SSE subscriber of the same session — not just the
// originator. Before the fix, the interactive prompt path forwarded
// straight to the agent without publishing `user_message_chunk` to
// the bus, so peer clients (B, C, ...) never saw A's input.
const factory: ChannelFactory = async () =>
makeChannel({ promptImpl: () => ({ stopReason: 'end_turn' }) }).channel;
const bridge = makeBridge({ channelFactory: factory });
const session = await bridge.spawnOrAttach({ workspaceCwd: WS_A });

const abortA = new AbortController();
const abortB = new AbortController();
const iterA = bridge.subscribeEvents(session.sessionId, {
signal: abortA.signal,
});
const iterB = bridge.subscribeEvents(session.sessionId, {
signal: abortB.signal,
});

// Collect the first user_message_chunk each subscriber sees.
const firstUserChunk = async (
iter: AsyncIterable<{
type: string;
data: unknown;
originatorClientId?: string;
}>,
): Promise<{ originatorClientId?: string; data: unknown }> => {
for await (const e of iter) {
if (e.type !== 'session_update') continue;
const update = (e.data as { update?: { sessionUpdate?: string } })
?.update;
if (update?.sessionUpdate === 'user_message_chunk') {
return { originatorClientId: e.originatorClientId, data: e.data };
}
}
throw new Error('no user_message_chunk observed');
};

const aPromise = firstUserChunk(iterA);
const bPromise = firstUserChunk(iterB);

// Client A sends the prompt with its trusted clientId.
await bridge.sendPrompt(
session.sessionId,
{
sessionId: session.sessionId,
prompt: [{ type: 'text', text: 'hello from A' }],
},
undefined,
{ clientId: session.clientId },
);

const [aChunk, bChunk] = await Promise.all([aPromise, bPromise]);

// Both subscribers saw the user input echoed to the bus.
for (const chunk of [aChunk, bChunk]) {
const update = (
chunk.data as {
update: {
sessionUpdate: string;
content: unknown;
_meta?: unknown;
};
}
).update;
expect(update.sessionUpdate).toBe('user_message_chunk');
expect(update.content).toEqual({ type: 'text', text: 'hello from A' });
// Originator stamp present so SDK `suppressOwnUserEcho` can dedup
// on the originator's own UI.
expect(chunk.originatorClientId).toBe(session.clientId);
// Source marker distinguishes the bridge echo from agent content.
expect((update._meta as { source?: string })?.source).toBe(
'bridge-echo',
);
}

abortA.abort();
abortB.abort();
await bridge.shutdown();
});

it('echoes one user_message_chunk per content block (multi-modal)', async () => {
const factory: ChannelFactory = async () =>
makeChannel({ promptImpl: () => ({ stopReason: 'end_turn' }) }).channel;
const bridge = makeBridge({ channelFactory: factory });
const session = await bridge.spawnOrAttach({ workspaceCwd: WS_A });

const abort = new AbortController();
const iter = bridge.subscribeEvents(session.sessionId, {
signal: abort.signal,
});

const collected: Array<{ sessionUpdate: string; content: unknown }> = [];
const drain = (async () => {
for await (const e of iter) {
if (e.type !== 'session_update') continue;
const update = (
e.data as { update?: { sessionUpdate?: string; content?: unknown } }
)?.update;
if (update?.sessionUpdate === 'user_message_chunk') {
collected.push({
sessionUpdate: update.sessionUpdate,
content: update.content,
});
if (collected.length === 2) break;
}
}
})();

await bridge.sendPrompt(
session.sessionId,
{
sessionId: session.sessionId,
prompt: [
{ type: 'text', text: 'describe this' },
{ type: 'resource_link', uri: 'file:///x.png', name: 'x.png' },
],
},
undefined,
{ clientId: session.clientId },
);

await drain;
// One echo frame per content block, in order.
expect(collected).toHaveLength(2);
expect(collected[0]?.content).toEqual({
type: 'text',
text: 'describe this',
});
expect(collected[1]?.content).toMatchObject({ type: 'resource_link' });

abort.abort();
await bridge.shutdown();
});

it('broadcasts prompt_cancelled with originator attribution on cancelSession', async () => {
// Cross-client sync: a cancel must surface as a first-class event
// so peer subscribers don't have to infer it from the absence of
// further agent chunks.
const factory: ChannelFactory = async () =>
makeChannel({ promptImpl: () => ({ stopReason: 'end_turn' }) }).channel;
const bridge = makeBridge({ channelFactory: factory });
const session = await bridge.spawnOrAttach({ workspaceCwd: WS_A });

const abort = new AbortController();
const iter = bridge.subscribeEvents(session.sessionId, {
signal: abort.signal,
});
const firstCancel = (async () => {
for await (const e of iter) {
if (e.type === 'prompt_cancelled') return e;
}
throw new Error('no prompt_cancelled observed');
})();

await bridge.cancelSession(session.sessionId, undefined, {
clientId: session.clientId,
});

const evt = await firstCancel;
expect(evt.type).toBe('prompt_cancelled');
expect((evt.data as { sessionId: string }).sessionId).toBe(
session.sessionId,
);
expect(evt.originatorClientId).toBe(session.clientId);

abort.abort();
await bridge.shutdown();
});

it('broadcasts prompt_cancelled to peers when the originator SSE aborts mid-prompt', async () => {
// Cross-client sync: client disconnect (tab close / network drop /
// laptop sleep) is the most common cancel trigger in production.
// The `sendPrompt` `onAbort` path must publish `prompt_cancelled`
// to peer subscribers — not just the explicit `cancelSession`
// route. A regression here would silently re-open the gap.
let releasePrompt: (() => void) | undefined;
const factory: ChannelFactory = async () =>
makeChannel({
// Hang the prompt so it stays in-flight while we abort.
promptImpl: async () => {
await new Promise<void>((res) => {
releasePrompt = res;
});
return { stopReason: 'cancelled' };
},
}).channel;
const bridge = makeBridge({ channelFactory: factory });
const session = await bridge.spawnOrAttach({ workspaceCwd: WS_A });

// Peer subscriber (a DIFFERENT client watching the same session).
const peerAbort = new AbortController();
const peerIter = bridge.subscribeEvents(session.sessionId, {
signal: peerAbort.signal,
});
const peerCancel = (async () => {
for await (const e of peerIter) {
if (e.type === 'prompt_cancelled') return e;
}
throw new Error('peer never saw prompt_cancelled');
})();

// Originator sends the (hanging) prompt, then its SSE/HTTP signal
// aborts mid-flight (connection dropped).
const promptAbort = new AbortController();
const promptPromise = bridge
.sendPrompt(
session.sessionId,
{
sessionId: session.sessionId,
prompt: [{ type: 'text', text: 'long running' }],
},
promptAbort.signal,
{ clientId: session.clientId },
)
.catch(() => {
// AbortError is expected — the originator's connection dropped.
});

// Give the queue worker a tick to start the prompt, then abort.
await new Promise((r) => setTimeout(r, 10));
promptAbort.abort();

const evt = await peerCancel;
expect(evt.type).toBe('prompt_cancelled');
expect((evt.data as { sessionId: string }).sessionId).toBe(
session.sessionId,
);
// Attributed to the prompt's originator (whose connection dropped).
expect(evt.originatorClientId).toBe(session.clientId);

// Let the hung promptImpl settle so shutdown doesn't wait on it.
releasePrompt?.();
await promptPromise;
peerAbort.abort();
await bridge.shutdown();
});

it('stamps envelope originatorClientId on session_closed', async () => {
const factory: ChannelFactory = async () => makeChannel().channel;
const bridge = makeBridge({ channelFactory: factory });
const session = await bridge.spawnOrAttach({ workspaceCwd: WS_A });

const abort = new AbortController();
const iter = bridge.subscribeEvents(session.sessionId, {
signal: abort.signal,
});
const firstClosed = (async () => {
for await (const e of iter) {
if (e.type === 'session_closed') return e;
}
throw new Error('no session_closed observed');
})();

await bridge.closeSession(session.sessionId, {
clientId: session.clientId,
});

const evt = await firstClosed;
// Envelope-level stamp (new) — sibling events use this field.
expect(evt.originatorClientId).toBe(session.clientId);
// Back-compat `data.closedBy` retained.
expect((evt.data as { closedBy?: string }).closedBy).toBe(
session.clientId,
);

abort.abort();
await bridge.shutdown();
});

it('stamps envelope originatorClientId on session_metadata_updated', async () => {
const factory: ChannelFactory = async () => makeChannel().channel;
const bridge = makeBridge({ channelFactory: factory });
const session = await bridge.spawnOrAttach({ workspaceCwd: WS_A });

const abort = new AbortController();
const iter = bridge.subscribeEvents(session.sessionId, {
signal: abort.signal,
});
const firstMeta = (async () => {
for await (const e of iter) {
if (e.type === 'session_metadata_updated') return e;
}
throw new Error('no session_metadata_updated observed');
})();

bridge.updateSessionMetadata(
session.sessionId,
{ displayName: 'renamed session' },
{ clientId: session.clientId },
);

const evt = await firstMeta;
expect(evt.originatorClientId).toBe(session.clientId);
expect((evt.data as { displayName?: string }).displayName).toBe(
'renamed session',
);

abort.abort();
await bridge.shutdown();
});

it('overrides a stale sessionId in the body with the routing id', async () => {
const handles: ChannelHandle[] = [];
const factory: ChannelFactory = async () => {
Expand Down
Loading