diff --git a/packages/opencode/src/bus/global.ts b/packages/opencode/src/bus/global.ts index e751b59faf5..ba1d109193f 100644 --- a/packages/opencode/src/bus/global.ts +++ b/packages/opencode/src/bus/global.ts @@ -10,3 +10,4 @@ export const GlobalBus = new EventEmitter<{ }, ] }>() +GlobalBus.setMaxListeners(100) diff --git a/packages/opencode/src/server/adapter.bun.ts b/packages/opencode/src/server/adapter.bun.ts index 3e70b97e8af..6528aa9e2ab 100644 --- a/packages/opencode/src/server/adapter.bun.ts +++ b/packages/opencode/src/server/adapter.bun.ts @@ -11,7 +11,11 @@ export const adapter: Adapter = { const args = { fetch: app.fetch, hostname: opts.hostname, - idleTimeout: 0, + // Default is 10s which is too aggressive for SSE connections. + // 0 disables the timeout entirely — dead connections (CLOSE_WAIT) are + // never cleaned up, causing unbounded memory growth. 120s gives the + // cleanup chain enough time to fire while still bounding leak duration. + idleTimeout: 120, websocket: ws.websocket, } as const const start = (port: number) => { diff --git a/packages/opencode/src/server/instance/event.ts b/packages/opencode/src/server/instance/event.ts index 5d631d954eb..3ddf7b264bd 100644 --- a/packages/opencode/src/server/instance/event.ts +++ b/packages/opencode/src/server/instance/event.ts @@ -75,11 +75,25 @@ export const EventRoutes = () => }) stream.onAbort(stop) + // Second abort path: req.raw.signal fires before stream.onAbort on direct + // connections (~2ms earlier), and provides an independent cleanup path when + // the responseReadable.cancel() chain is broken (e.g. reverse proxy). + // Hono only registers this on Bun 1.0/1.1 (isOldBunVersion gate); we add + // it unconditionally so Bun 1.2+ is also covered. + c.req.raw.signal.addEventListener("abort", stop) try { for await (const data of q) { if (data === null) return - await stream.writeSSE({ data }) + try { + await stream.writeSSE({ data }) + } catch { + // Hono 4.x StreamingApi.write() has an empty catch — this block + // never fires on the current version. Kept for forward compatibility + // in case a future Hono version propagates write errors. + stop() + return + } } } finally { stop() diff --git a/packages/opencode/src/server/instance/global.ts b/packages/opencode/src/server/instance/global.ts index d462a07f745..40a61ca415a 100644 --- a/packages/opencode/src/server/instance/global.ts +++ b/packages/opencode/src/server/instance/global.ts @@ -57,11 +57,25 @@ async function streamEvents(c: Context, subscribe: (q: AsyncQueue const unsub = subscribe(q) stream.onAbort(stop) + // Second abort path: req.raw.signal fires before stream.onAbort on direct + // connections (~2ms earlier), and provides an independent cleanup path when + // the responseReadable.cancel() chain is broken (e.g. reverse proxy). + // Hono only registers this on Bun 1.0/1.1 (isOldBunVersion gate); we add + // it unconditionally so Bun 1.2+ is also covered. + c.req.raw.signal.addEventListener("abort", stop) try { for await (const data of q) { if (data === null) return - await stream.writeSSE({ data }) + try { + await stream.writeSSE({ data }) + } catch { + // Hono 4.x StreamingApi.write() has an empty catch — this block + // never fires on the current version. Kept for forward compatibility + // in case a future Hono version propagates write errors. + stop() + return + } } } finally { stop() diff --git a/packages/opencode/src/util/queue.ts b/packages/opencode/src/util/queue.ts index a1af53fe8f0..5537791db52 100644 --- a/packages/opencode/src/util/queue.ts +++ b/packages/opencode/src/util/queue.ts @@ -2,10 +2,18 @@ export class AsyncQueue implements AsyncIterable { private queue: T[] = [] private resolvers: ((value: T) => void)[] = [] + constructor(private limit = 1000) {} + push(item: T) { const resolve = this.resolvers.shift() - if (resolve) resolve(item) - else this.queue.push(item) + if (resolve) { + resolve(item) + } else { + if (this.queue.length >= this.limit) { + this.queue.shift() + } + this.queue.push(item) + } } async next(): Promise { diff --git a/packages/opencode/test/util/queue.test.ts b/packages/opencode/test/util/queue.test.ts new file mode 100644 index 00000000000..ddf9333bf85 --- /dev/null +++ b/packages/opencode/test/util/queue.test.ts @@ -0,0 +1,70 @@ +import { describe, expect, test } from "bun:test" +import { AsyncQueue } from "../../src/util/queue" + +describe("AsyncQueue", () => { + test("basic FIFO order", async () => { + const q = new AsyncQueue() + q.push(1) + q.push(2) + q.push(3) + expect(await q.next()).toBe(1) + expect(await q.next()).toBe(2) + expect(await q.next()).toBe(3) + }) + + test("drops oldest entry when limit is reached", async () => { + const q = new AsyncQueue(3) + q.push(1) + q.push(2) + q.push(3) + q.push(4) // should drop 1 + expect(await q.next()).toBe(2) + expect(await q.next()).toBe(3) + expect(await q.next()).toBe(4) + }) + + test("queue length never exceeds limit", () => { + const limit = 5 + const q = new AsyncQueue(limit) + for (let i = 0; i < 100; i++) q.push(i) + // drain and count + let count = 0 + while ((q as any).queue.length > 0) { + ;(q as any).queue.shift() + count++ + } + expect(count).toBeLessThanOrEqual(limit) + }) + + test("item delivered directly to waiting resolver bypasses limit", async () => { + const q = new AsyncQueue(2) + // consumer is already waiting + const result = q.next() + q.push(99) + expect(await result).toBe(99) + // internal queue should still be empty + expect((q as any).queue.length).toBe(0) + }) + + test("null sentinel terminates async iteration", async () => { + const q = new AsyncQueue() + q.push(1) + q.push(2) + q.push(null) + const collected: number[] = [] + for await (const item of q) { + if (item === null) break + collected.push(item) + } + expect(collected).toEqual([1, 2]) + }) + + test("zombie scenario: 10000 pushes with no consumer stays bounded", () => { + const limit = 100 + const q = new AsyncQueue(limit) + for (let i = 0; i < 10_000; i++) q.push(i) + expect((q as any).queue.length).toBe(limit) + // most recent items are retained + expect((q as any).queue[(q as any).queue.length - 1]).toBe(9999) + }) +})