diff --git a/packages/opencode/src/server/routes/global.ts b/packages/opencode/src/server/routes/global.ts index 54f9972e0212..0c30922d7fa1 100644 --- a/packages/opencode/src/server/routes/global.ts +++ b/packages/opencode/src/server/routes/global.ts @@ -8,6 +8,7 @@ import { SyncEvent } from "@/sync" import { GlobalBus } from "@/bus/global" import { AppRuntime } from "@/effect/app-runtime" import { AsyncQueue } from "@/util/queue" +import { MAX_QUEUE_SIZE, WRITE_TIMEOUT_MS, writeSSEWithTimeout } from "@/util/sse" import { Instance } from "../../project/instance" import { Installation } from "@/installation" import { InstallationVersion } from "@/installation/version" @@ -20,7 +21,7 @@ const log = Log.create({ service: "server" }) export const GlobalDisposedEvent = BusEvent.define("global.disposed", z.object({})) -async function streamEvents(c: Context, subscribe: (q: AsyncQueue) => () => void) { +async function streamEvents(c: Context, subscribe: (push: (data: string) => void) => () => void) { return streamSSE(c, async (stream) => { const q = new AsyncQueue() let done = false @@ -34,9 +35,21 @@ async function streamEvents(c: Context, subscribe: (q: AsyncQueue }), ) + const enqueue = (payload: string) => { + if (done) return + // Guard against CLOSE_WAIT zombie sockets where `stream.onAbort` never + // fires: if the consumer stops draining, the queue grows unbounded. + if (q.size >= MAX_QUEUE_SIZE) { + log.warn("global event queue overflow, closing stream", { size: q.size }) + stop() + return + } + q.push(payload) + } + // Send heartbeat every 10s to prevent stalled proxy streams. const heartbeat = setInterval(() => { - q.push( + enqueue( JSON.stringify({ payload: { type: "server.heartbeat", @@ -51,18 +64,21 @@ async function streamEvents(c: Context, subscribe: (q: AsyncQueue done = true clearInterval(heartbeat) unsub() - q.push(null) + q.close(null) log.info("global event disconnected") } - const unsub = subscribe(q) + const unsub = subscribe(enqueue) stream.onAbort(stop) try { for await (const data of q) { if (data === null) return - await stream.writeSSE({ data }) + await writeSSEWithTimeout(stream, data, WRITE_TIMEOUT_MS).catch((err) => { + log.info("global event write failed, closing stream", { error: String(err) }) + stop() + }) } } finally { stop() @@ -127,9 +143,9 @@ export const GlobalRoutes = lazy(() => c.header("X-Accel-Buffering", "no") c.header("X-Content-Type-Options", "nosniff") - return streamEvents(c, (q) => { + return streamEvents(c, (push) => { async function handler(event: any) { - q.push(JSON.stringify(event)) + push(JSON.stringify(event)) } GlobalBus.on("event", handler) return () => GlobalBus.off("event", handler) diff --git a/packages/opencode/src/server/routes/instance/event.ts b/packages/opencode/src/server/routes/instance/event.ts index 1d883bd88314..fbd1044e492f 100644 --- a/packages/opencode/src/server/routes/instance/event.ts +++ b/packages/opencode/src/server/routes/instance/event.ts @@ -6,6 +6,7 @@ import { Log } from "@/util" import { BusEvent } from "@/bus/bus-event" import { Bus } from "@/bus" import { AsyncQueue } from "@/util/queue" +import { MAX_QUEUE_SIZE, WRITE_TIMEOUT_MS, writeSSEWithTimeout } from "@/util/sse" const log = Log.create({ service: "server" }) @@ -47,9 +48,21 @@ export const EventRoutes = () => }), ) + const enqueue = (payload: string) => { + if (done) return + // Guard against CLOSE_WAIT zombie sockets where `stream.onAbort` never + // fires: if the consumer stops draining, the queue grows unbounded. + if (q.size >= MAX_QUEUE_SIZE) { + log.warn("event queue overflow, closing stream", { size: q.size }) + stop() + return + } + q.push(payload) + } + // Send heartbeat every 10s to prevent stalled proxy streams. const heartbeat = setInterval(() => { - q.push( + enqueue( JSON.stringify({ type: "server.heartbeat", properties: {}, @@ -62,12 +75,12 @@ export const EventRoutes = () => done = true clearInterval(heartbeat) unsub() - q.push(null) + q.close(null) log.info("event disconnected") } const unsub = Bus.subscribeAll((event) => { - q.push(JSON.stringify(event)) + enqueue(JSON.stringify(event)) if (event.type === Bus.InstanceDisposed.type) { stop() } @@ -78,7 +91,10 @@ export const EventRoutes = () => try { for await (const data of q) { if (data === null) return - await stream.writeSSE({ data }) + await writeSSEWithTimeout(stream, data, WRITE_TIMEOUT_MS).catch((err) => { + log.info("event write failed, closing stream", { error: String(err) }) + stop() + }) } } finally { stop() diff --git a/packages/opencode/src/util/queue.ts b/packages/opencode/src/util/queue.ts index a1af53fe8f09..01009b911272 100644 --- a/packages/opencode/src/util/queue.ts +++ b/packages/opencode/src/util/queue.ts @@ -1,15 +1,37 @@ export class AsyncQueue implements AsyncIterable { private queue: T[] = [] private resolvers: ((value: T) => void)[] = [] + private closed = false + private closeSentinel?: T + + get size() { + return this.queue.length + } push(item: T) { + if (this.closed) return const resolve = this.resolvers.shift() if (resolve) resolve(item) else this.queue.push(item) } + /** + * Close the queue. Drops any buffered items, resolves any pending consumer + * immediately with `sentinel`, and ignores further pushes. Used to unblock + * an iterator from the producer side (e.g. on disconnect) without having + * to wait for the consumer to drain the backlog. + */ + close(sentinel: T) { + if (this.closed) return + this.closed = true + this.closeSentinel = sentinel + this.queue.length = 0 + while (this.resolvers.length > 0) this.resolvers.shift()!(sentinel) + } + async next(): Promise { if (this.queue.length > 0) return this.queue.shift()! + if (this.closed) return this.closeSentinel as T return new Promise((resolve) => this.resolvers.push(resolve)) } diff --git a/packages/opencode/src/util/sse.ts b/packages/opencode/src/util/sse.ts new file mode 100644 index 000000000000..0c19cf2cd35b --- /dev/null +++ b/packages/opencode/src/util/sse.ts @@ -0,0 +1,31 @@ +/** + * Upper bound on the SSE producer queue. When exceeded, the stream is assumed + * to be wedged (e.g. TCP in CLOSE_WAIT where Hono's `onAbort` never fires) and + * the caller is expected to close it. Sized to allow bursty traffic while + * still catching genuine stalls within seconds. + */ +export const MAX_QUEUE_SIZE = 10_000 + +/** + * Max time an individual `writeSSE` is allowed to sit pending before the + * stream is considered dead. On a healthy connection each write completes in + * milliseconds; this exists to break out of half-closed sockets where the + * write neither resolves nor rejects. + */ +export const WRITE_TIMEOUT_MS = 30_000 + +type SSEStream = { + writeSSE: (input: { data: string }) => Promise +} + +export function writeSSEWithTimeout(stream: SSEStream, data: string, ms: number) { + let timer: ReturnType | undefined + return Promise.race([ + stream.writeSSE({ data }), + new Promise((_, reject) => { + timer = setTimeout(() => reject(new Error("sse write timeout")), ms) + }), + ]).finally(() => { + if (timer) clearTimeout(timer) + }) +} diff --git a/packages/opencode/test/util/queue.test.ts b/packages/opencode/test/util/queue.test.ts new file mode 100644 index 000000000000..c12027e30a66 --- /dev/null +++ b/packages/opencode/test/util/queue.test.ts @@ -0,0 +1,103 @@ +import { describe, expect, test } from "bun:test" +import { AsyncQueue } from "../../src/util/queue" + +describe("util.queue", () => { + test("buffers items when nothing is consuming", () => { + const q = new AsyncQueue() + q.push(1) + q.push(2) + q.push(3) + expect(q.size).toBe(3) + }) + + test("delivers pushed items to a waiting consumer", async () => { + const q = new AsyncQueue() + const waiter = q.next() + q.push(42) + expect(await waiter).toBe(42) + expect(q.size).toBe(0) + }) + + test("preserves FIFO order", async () => { + const q = new AsyncQueue() + q.push(1) + q.push(2) + q.push(3) + expect(await q.next()).toBe(1) + expect(await q.next()).toBe(2) + expect(await q.next()).toBe(3) + }) + + test("iterates pushed items as an async iterable until closed", async () => { + const q = new AsyncQueue() + q.push(1) + q.push(2) + q.push(3) + + const seen: number[] = [] + const consumer = (async () => { + for await (const item of q) { + if (item === null) break + seen.push(item) + } + })() + // Wait until the buffered items have been drained, then close to unblock + // the iterator that is now awaiting the next item. + while (q.size > 0) await Promise.resolve() + q.close(null) + await consumer + expect(seen).toEqual([1, 2, 3]) + }) + + test("close() drops buffered items and unblocks waiters", async () => { + const q = new AsyncQueue() + q.push("a") + q.push("b") + expect(q.size).toBe(2) + + // Attach a waiter that is already past the buffer. + q.push("c") + const consumed: (string | null)[] = [] + consumed.push(await q.next()) // "a" + consumed.push(await q.next()) // "b" + consumed.push(await q.next()) // "c" + + // Now nothing buffered, a waiter is pending. + const pending = q.next() + q.close(null) + expect(await pending).toBeNull() + expect(q.size).toBe(0) + }) + + test("close() ignores subsequent pushes", async () => { + const q = new AsyncQueue() + q.close(null) + q.push(1) + q.push(2) + expect(q.size).toBe(0) + expect(await q.next()).toBeNull() + }) + + test("close() is idempotent", async () => { + const q = new AsyncQueue() + q.push(1) + q.close(null) + q.close(null) + // Still returns the close sentinel because buffer was dropped on first close. + expect(await q.next()).toBeNull() + }) + + test("close() drops buffer so iterator exits immediately", async () => { + const q = new AsyncQueue() + // Simulate a stuck consumer: fill the queue with many items before close. + for (let i = 0; i < 100; i++) q.push("item-" + i) + q.close(null) + // Despite 100 buffered items, iteration should terminate immediately. + const seen: string[] = [] + for await (const item of q) { + if (item === null) break + seen.push(item) + } + expect(seen).toEqual([]) + }) +}) diff --git a/packages/opencode/test/util/sse.test.ts b/packages/opencode/test/util/sse.test.ts new file mode 100644 index 000000000000..11aa45b0c261 --- /dev/null +++ b/packages/opencode/test/util/sse.test.ts @@ -0,0 +1,53 @@ +import { describe, expect, test } from "bun:test" +import { writeSSEWithTimeout } from "../../src/util/sse" + +describe("util.sse", () => { + test("resolves when writeSSE resolves in time", async () => { + const calls: string[] = [] + const stream = { + writeSSE: async (input: { data: string }) => { + calls.push(input.data) + }, + } + await writeSSEWithTimeout(stream, "hello", 1000) + expect(calls).toEqual(["hello"]) + }) + + test("rejects with timeout error when writeSSE hangs", async () => { + const stream = { + writeSSE: () => new Promise(() => {}), + } + const err = await writeSSEWithTimeout(stream, "hello", 50).then( + () => null, + (e: Error) => e, + ) + expect(err).toBeInstanceOf(Error) + expect(err?.message).toBe("sse write timeout") + }) + + test("clears the timer when the write resolves first", async () => { + // If the timer were not cleared it would keep the event loop alive and + // eventually reject. We verify by waiting past the nominal timeout and + // confirming the promise has already settled with success. + const stream = { + writeSSE: async () => {}, + } + const p = writeSSEWithTimeout(stream, "hello", 20) + await p + // Wait past the timeout window to make sure the timer didn't fire later. + await new Promise((r) => setTimeout(r, 40)) + }) + + test("surfaces the underlying writeSSE error", async () => { + const stream = { + writeSSE: async () => { + throw new Error("boom") + }, + } + const err = await writeSSEWithTimeout(stream, "hello", 1000).then( + () => null, + (e: Error) => e, + ) + expect(err?.message).toBe("boom") + }) +})