Skip to content
145 changes: 142 additions & 3 deletions packages/acp-bridge/src/eventBus.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,20 +274,30 @@ describe('EventBus', () => {
// A `lastEventId: 0` resume with a queue cap larger than the ring
// collects exactly 8000 live frames; ids start at 2 because id=1
// was the one shifted out of the ring.
//
// #4175 F4 prereq: `lastEventId: 0` + earliest-id-in-ring = 2
// crosses the eviction-detection threshold (earliest > last + 1),
// so an extra synthetic `state_resync_required` frame is emitted
// FIRST. The filter below restricts to live ids, which excludes
// the synthetic (no id), so the original "8000 live frames"
// invariant is preserved.
const abort = new AbortController();
const iter = bus.subscribe({
lastEventId: 0,
maxQueued: 9000,
signal: abort.signal,
});
const events = await collect(iter, 8000);
// Collect 8001 frames now: 1 synthetic resync + 8000 live.
const events = await collect(iter, 8001);
abort.abort();
const liveIds = events
.filter((e) => e.id !== undefined)
.map((e) => e.id as number);
expect(liveIds).toHaveLength(8000);
expect(liveIds[0]).toBe(2);
expect(liveIds[liveIds.length - 1]).toBe(8001);
// The synthetic resync frame is the first one.
expect(events[0]?.type).toBe('state_resync_required');
});

it('eviction detaches the abort listener from a stalled consumer (BmJT1)', async () => {
Expand Down Expand Up @@ -480,9 +490,138 @@ describe('EventBus', () => {
const out: BridgeEvent[] = [];
for await (const e of iter) {
out.push(e);
if (out.length === 3) break;
// After the state_resync_required frame (synthetic) + 3 replay
// frames, we're done.
if (out.length === 4) break;
}
expect(out.map((e) => e.id)).toEqual([3, 4, 5]);
// First frame is the synthetic state_resync_required (no id).
expect(out[0]?.type).toBe('state_resync_required');
expect(out[0]?.id).toBeUndefined();
// Then the 3 surviving ring frames.
expect(out.slice(1).map((e) => e.id)).toEqual([3, 4, 5]);
abort.abort();
});

describe('state_resync_required (#4175 F4 prereq, Ilya0527 issue #15)', () => {
it('emits state_resync_required when lastEventId is past the ring head', async () => {
// Setup: ring holds 3, ids 1..5 published → ring contains [3,4,5].
// Consumer reconnects with Last-Event-ID: 1 → events 2 was evicted.
// Daemon must emit state_resync_required FIRST so SDK reducer
// knows its state is stale before applying any replay frames.
const bus = new EventBus(3);
for (let i = 1; i <= 5; i++) bus.publish({ type: 'foo', data: i });
const abort = new AbortController();
const iter = bus.subscribe({
lastEventId: 1,
signal: abort.signal,
});
const out: BridgeEvent[] = [];
for await (const e of iter) {
out.push(e);
if (out.length === 4) break;
}
// First frame is the resync terminal (synthetic, no id).
expect(out[0]?.type).toBe('state_resync_required');
expect(out[0]?.id).toBeUndefined();
const data = out[0]?.data as {
reason: string;
lastDeliveredId: number;
earliestAvailableId: number;
};
expect(data.reason).toBe('ring_evicted');
expect(data.lastDeliveredId).toBe(1);
expect(data.earliestAvailableId).toBe(3); // event 2 was evicted
// Replay continues after the resync frame (per design — SDK can
// compute "what you missed" diff later) — so we still get the
// 3 surviving ring frames.
expect(out.slice(1).map((e) => e.id)).toEqual([3, 4, 5]);
abort.abort();
});

it('does NOT emit state_resync_required when lastEventId is in the ring', async () => {
// Consumer's lastEventId is well within the ring → no gap → no
// resync needed.
const bus = new EventBus(10);
for (let i = 1; i <= 5; i++) bus.publish({ type: 'foo', data: i });
const abort = new AbortController();
const iter = bus.subscribe({
lastEventId: 2,
signal: abort.signal,
});
const out: BridgeEvent[] = [];
for await (const e of iter) {
out.push(e);
if (out.length === 3) break;
}
// No resync frame — just the 3 replay frames (ids 3, 4, 5).
expect(out.map((e) => e.id)).toEqual([3, 4, 5]);
expect(out.some((e) => e.type === 'state_resync_required')).toBe(false);
abort.abort();
});

it('does NOT emit state_resync_required at the exact boundary (lastEventId === earliest - 1)', async () => {
// Boundary: ring's earliest id is N, lastEventId is N-1.
// No gap → no resync. Off-by-one guard.
const bus = new EventBus(3);
for (let i = 1; i <= 5; i++) bus.publish({ type: 'foo', data: i });
// Ring is now [3, 4, 5]. lastEventId=2 means "I have 1 and 2";
// next expected is 3, which IS in the ring. No gap.
const abort = new AbortController();
const iter = bus.subscribe({
lastEventId: 2,
signal: abort.signal,
});
const out: BridgeEvent[] = [];
for await (const e of iter) {
out.push(e);
if (out.length === 3) break;
}
expect(out.some((e) => e.type === 'state_resync_required')).toBe(false);
expect(out.map((e) => e.id)).toEqual([3, 4, 5]);
abort.abort();
});

it('does NOT emit state_resync_required when ring is empty', async () => {
// No publishes yet → earliestInRing is undefined → resync check
// skipped. Subscriber just waits for live events.
const bus = new EventBus(10);
const abort = new AbortController();
const iter = bus.subscribe({
lastEventId: 5,
signal: abort.signal,
});
// Publish one live event AFTER subscribe to confirm the stream works.
setTimeout(() => bus.publish({ type: 'foo', data: 1 }), 0);
const out: BridgeEvent[] = [];
for await (const e of iter) {
out.push(e);
if (out.length === 1) break;
}
// No resync frame — just the one live event.
expect(out[0]?.type).toBe('foo');
expect(out[0]?.id).toBe(1);
abort.abort();
});

it('does NOT emit state_resync_required when no lastEventId is provided (fresh subscribe)', async () => {
// First-time subscriber has no prior state to resync — resync
// would be meaningless. Check the no-lastEventId branch is
// skipped entirely.
const bus = new EventBus(3);
for (let i = 1; i <= 5; i++) bus.publish({ type: 'foo', data: i });
const abort = new AbortController();
const iter = bus.subscribe({ signal: abort.signal });
// Live-only — publish one event after subscribe to give the
// iterator something to yield.
setTimeout(() => bus.publish({ type: 'foo', data: 99 }), 0);
const out: BridgeEvent[] = [];
for await (const e of iter) {
out.push(e);
if (out.length === 1) break;
}
expect(out[0]?.type).toBe('foo');
expect(out.some((e) => e.type === 'state_resync_required')).toBe(false);
abort.abort();
});
});
});
39 changes: 39 additions & 0 deletions packages/acp-bridge/src/eventBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,45 @@ export class EventBus {
this.subs.add(sub);

if (opts.lastEventId !== undefined) {
// Detect ring eviction on resume (#4175 F4 prereq, Ilya0527
// issue #15): if the earliest event still in the ring has
// `id > lastEventId + 1`, then events between `lastEventId + 1`
// and `earliestInRing - 1` were evicted before the consumer
// reconnected — the consumer's reducer has a gap it doesn't
// know about. Pre-fix the resume silently succeeded ("you
// caught up!") even though the SDK reducer's state was now
// diverged from the daemon's truth.
//
// Emit `state_resync_required` as a TERMINAL synthetic frame
Comment thread
doudouOUC marked this conversation as resolved.
Outdated
// (no `id` — same pattern as `client_evicted` so it doesn't
// burn a slot in the per-session monotonic sequence other
// subscribers observe). The SDK reducer treats this as "your
// state is stale; call loadSession before applying any further
// deltas" — see `awaitingResync` flag in the SDK reducer.
//
// Replay continues after the resync frame (per design): the
// SDK reducer will auto-skip delta application until
// loadSession clears the flag, but the frames stay on the
// wire so SDK has the option to compute a "what you missed"
// diff later. This is network-friendly (no extra reconnect)
// and matches the existing `client_evicted` semantics
// (terminal-on-close, but consumer can still drain queued
// frames first).
const earliestInRing = this.ring[0]?.id;
if (
earliestInRing !== undefined &&
earliestInRing > opts.lastEventId + 1
) {
queue.forcePush({
Comment thread
doudouOUC marked this conversation as resolved.
v: EVENT_SCHEMA_VERSION,
type: 'state_resync_required',
data: {
reason: 'ring_evicted',
lastDeliveredId: opts.lastEventId,
earliestAvailableId: earliestInRing,
},
});
}
// Force-push replay frames so they bypass the per-subscriber size
// cap. The cap protects against a slow live consumer; replay is
// already historical and silently dropping it would undermine the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,11 @@ describe('HistoryReplayer', () => {
rawInput: { path: '/test.ts' },
_meta: {
toolName: 'read_file',
// #4175 F4 prereq — ToolCallEmitter now stamps provenance
// on every tool_call / tool_call_update event so the UI can
// dispatch on builtin / mcp / subagent without string-
// matching toolName.
provenance: 'builtin',
timestamp: toEpochMs(record.timestamp),
},
}),
Expand Down Expand Up @@ -314,6 +319,8 @@ describe('HistoryReplayer', () => {
rawOutput: 'File contents here',
_meta: {
toolName: 'read_file',
// #4175 F4 prereq — provenance stamped on update events too.
provenance: 'builtin',
timestamp: toEpochMs(record.timestamp),
},
});
Expand Down
Loading