Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 235 additions & 0 deletions packages/acp-bridge/src/bridge.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2029,6 +2029,241 @@ describe('createHttpAcpBridge', () => {
await bridge.shutdown();
});

it('echoes user_message_chunk to ALL session subscribers (cross-client sync)', async () => {
// Cross-client sync fix: a prompt sent by client A must be visible
// to every SSE subscriber of the same session — not just the
// originator. Before the fix, the interactive prompt path forwarded
// straight to the agent without publishing `user_message_chunk` to
// the bus, so peer clients (B, C, ...) never saw A's input.
const factory: ChannelFactory = async () =>
makeChannel({ promptImpl: () => ({ stopReason: 'end_turn' }) }).channel;
const bridge = makeBridge({ channelFactory: factory });
const session = await bridge.spawnOrAttach({ workspaceCwd: WS_A });

const abortA = new AbortController();
const abortB = new AbortController();
const iterA = bridge.subscribeEvents(session.sessionId, {
signal: abortA.signal,
});
const iterB = bridge.subscribeEvents(session.sessionId, {
signal: abortB.signal,
});

// Collect the first user_message_chunk each subscriber sees.
const firstUserChunk = async (
iter: AsyncIterable<{
type: string;
data: unknown;
originatorClientId?: string;
}>,
): Promise<{ originatorClientId?: string; data: unknown }> => {
for await (const e of iter) {
if (e.type !== 'session_update') continue;
const update = (e.data as { update?: { sessionUpdate?: string } })
?.update;
if (update?.sessionUpdate === 'user_message_chunk') {
return { originatorClientId: e.originatorClientId, data: e.data };
}
}
throw new Error('no user_message_chunk observed');
};

const aPromise = firstUserChunk(iterA);
const bPromise = firstUserChunk(iterB);

// Client A sends the prompt with its trusted clientId.
await bridge.sendPrompt(
session.sessionId,
{
sessionId: session.sessionId,
prompt: [{ type: 'text', text: 'hello from A' }],
},
undefined,
{ clientId: session.clientId },
);

const [aChunk, bChunk] = await Promise.all([aPromise, bPromise]);

// Both subscribers saw the user input echoed to the bus.
for (const chunk of [aChunk, bChunk]) {
const update = (
chunk.data as {
update: {
sessionUpdate: string;
content: unknown;
_meta?: unknown;
};
}
).update;
expect(update.sessionUpdate).toBe('user_message_chunk');
expect(update.content).toEqual({ type: 'text', text: 'hello from A' });
// Originator stamp present so SDK `suppressOwnUserEcho` can dedup
// on the originator's own UI.
expect(chunk.originatorClientId).toBe(session.clientId);
// Source marker distinguishes the bridge echo from agent content.
expect((update._meta as { source?: string })?.source).toBe(
'bridge-echo',
);
}

abortA.abort();
abortB.abort();
await bridge.shutdown();
});

it('echoes one user_message_chunk per content block (multi-modal)', async () => {
const factory: ChannelFactory = async () =>
makeChannel({ promptImpl: () => ({ stopReason: 'end_turn' }) }).channel;
const bridge = makeBridge({ channelFactory: factory });
const session = await bridge.spawnOrAttach({ workspaceCwd: WS_A });

const abort = new AbortController();
const iter = bridge.subscribeEvents(session.sessionId, {
signal: abort.signal,
});

const collected: Array<{ sessionUpdate: string; content: unknown }> = [];
const drain = (async () => {
for await (const e of iter) {
if (e.type !== 'session_update') continue;
const update = (
e.data as { update?: { sessionUpdate?: string; content?: unknown } }
)?.update;
if (update?.sessionUpdate === 'user_message_chunk') {
collected.push({
sessionUpdate: update.sessionUpdate,
content: update.content,
});
if (collected.length === 2) break;
}
}
})();

await bridge.sendPrompt(
session.sessionId,
{
sessionId: session.sessionId,
prompt: [
{ type: 'text', text: 'describe this' },
{ type: 'resource_link', uri: 'file:///x.png', name: 'x.png' },
],
},
undefined,
{ clientId: session.clientId },
);

await drain;
// One echo frame per content block, in order.
expect(collected).toHaveLength(2);
expect(collected[0]?.content).toEqual({
type: 'text',
text: 'describe this',
});
expect(collected[1]?.content).toMatchObject({ type: 'resource_link' });

abort.abort();
await bridge.shutdown();
});

it('broadcasts prompt_cancelled with originator attribution on cancelSession', async () => {
// Cross-client sync: a cancel must surface as a first-class event
// so peer subscribers don't have to infer it from the absence of
// further agent chunks.
const factory: ChannelFactory = async () =>
makeChannel({ promptImpl: () => ({ stopReason: 'end_turn' }) }).channel;
const bridge = makeBridge({ channelFactory: factory });
const session = await bridge.spawnOrAttach({ workspaceCwd: WS_A });

const abort = new AbortController();
const iter = bridge.subscribeEvents(session.sessionId, {
signal: abort.signal,
});
const firstCancel = (async () => {
for await (const e of iter) {
if (e.type === 'prompt_cancelled') return e;
}
throw new Error('no prompt_cancelled observed');
})();

await bridge.cancelSession(session.sessionId, undefined, {
clientId: session.clientId,
});

const evt = await firstCancel;
expect(evt.type).toBe('prompt_cancelled');
expect((evt.data as { sessionId: string }).sessionId).toBe(
session.sessionId,
);
expect(evt.originatorClientId).toBe(session.clientId);

abort.abort();
await bridge.shutdown();
});

it('stamps envelope originatorClientId on session_closed', async () => {
const factory: ChannelFactory = async () => makeChannel().channel;
const bridge = makeBridge({ channelFactory: factory });
const session = await bridge.spawnOrAttach({ workspaceCwd: WS_A });

const abort = new AbortController();
const iter = bridge.subscribeEvents(session.sessionId, {
signal: abort.signal,
});
const firstClosed = (async () => {
for await (const e of iter) {
if (e.type === 'session_closed') return e;
}
throw new Error('no session_closed observed');
})();

await bridge.closeSession(session.sessionId, {
clientId: session.clientId,
});

const evt = await firstClosed;
// Envelope-level stamp (new) — sibling events use this field.
expect(evt.originatorClientId).toBe(session.clientId);
// Back-compat `data.closedBy` retained.
expect((evt.data as { closedBy?: string }).closedBy).toBe(
session.clientId,
);

abort.abort();
await bridge.shutdown();
});

it('stamps envelope originatorClientId on session_metadata_updated', async () => {
const factory: ChannelFactory = async () => makeChannel().channel;
const bridge = makeBridge({ channelFactory: factory });
const session = await bridge.spawnOrAttach({ workspaceCwd: WS_A });

const abort = new AbortController();
const iter = bridge.subscribeEvents(session.sessionId, {
signal: abort.signal,
});
const firstMeta = (async () => {
for await (const e of iter) {
if (e.type === 'session_metadata_updated') return e;
}
throw new Error('no session_metadata_updated observed');
})();

bridge.updateSessionMetadata(
session.sessionId,
{ displayName: 'renamed session' },
{ clientId: session.clientId },
);

const evt = await firstMeta;
expect(evt.originatorClientId).toBe(session.clientId);
expect((evt.data as { displayName?: string }).displayName).toBe(
'renamed session',
);

abort.abort();
await bridge.shutdown();
});

it('overrides a stale sessionId in the body with the routing id', async () => {
const handles: ChannelHandle[] = [];
const factory: ChannelFactory = async () => {
Expand Down
Loading