Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions packages/opencode/src/server/routes/global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { SyncEvent } from "@/sync"
import { GlobalBus } from "@/bus/global"
import { AppRuntime } from "@/effect/app-runtime"
import { AsyncQueue } from "@/util/queue"
import { MAX_QUEUE_SIZE, WRITE_TIMEOUT_MS, writeSSEWithTimeout } from "@/util/sse"
import { Instance } from "../../project/instance"
import { Installation } from "@/installation"
import { InstallationVersion } from "@/installation/version"
Expand All @@ -20,7 +21,7 @@ const log = Log.create({ service: "server" })

export const GlobalDisposedEvent = BusEvent.define("global.disposed", z.object({}))

async function streamEvents(c: Context, subscribe: (q: AsyncQueue<string | null>) => () => void) {
async function streamEvents(c: Context, subscribe: (push: (data: string) => void) => () => void) {
return streamSSE(c, async (stream) => {
const q = new AsyncQueue<string | null>()
let done = false
Expand All @@ -34,9 +35,21 @@ async function streamEvents(c: Context, subscribe: (q: AsyncQueue<string | null>
}),
)

const enqueue = (payload: string) => {
if (done) return
// Guard against CLOSE_WAIT zombie sockets where `stream.onAbort` never
// fires: if the consumer stops draining, the queue grows unbounded.
if (q.size >= MAX_QUEUE_SIZE) {
log.warn("global event queue overflow, closing stream", { size: q.size })
stop()
return
}
q.push(payload)
}

// Send heartbeat every 10s to prevent stalled proxy streams.
const heartbeat = setInterval(() => {
q.push(
enqueue(
JSON.stringify({
payload: {
type: "server.heartbeat",
Expand All @@ -51,18 +64,21 @@ async function streamEvents(c: Context, subscribe: (q: AsyncQueue<string | null>
done = true
clearInterval(heartbeat)
unsub()
q.push(null)
q.close(null)
log.info("global event disconnected")
}

const unsub = subscribe(q)
const unsub = subscribe(enqueue)

stream.onAbort(stop)

try {
for await (const data of q) {
if (data === null) return
await stream.writeSSE({ data })
await writeSSEWithTimeout(stream, data, WRITE_TIMEOUT_MS).catch((err) => {
log.info("global event write failed, closing stream", { error: String(err) })
stop()
})
}
} finally {
stop()
Expand Down Expand Up @@ -127,9 +143,9 @@ export const GlobalRoutes = lazy(() =>
c.header("X-Accel-Buffering", "no")
c.header("X-Content-Type-Options", "nosniff")

return streamEvents(c, (q) => {
return streamEvents(c, (push) => {
async function handler(event: any) {
q.push(JSON.stringify(event))
push(JSON.stringify(event))
}
GlobalBus.on("event", handler)
return () => GlobalBus.off("event", handler)
Expand Down
24 changes: 20 additions & 4 deletions packages/opencode/src/server/routes/instance/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Log } from "@/util"
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { AsyncQueue } from "@/util/queue"
import { MAX_QUEUE_SIZE, WRITE_TIMEOUT_MS, writeSSEWithTimeout } from "@/util/sse"

const log = Log.create({ service: "server" })

Expand Down Expand Up @@ -47,9 +48,21 @@ export const EventRoutes = () =>
}),
)

const enqueue = (payload: string) => {
if (done) return
// Guard against CLOSE_WAIT zombie sockets where `stream.onAbort` never
// fires: if the consumer stops draining, the queue grows unbounded.
if (q.size >= MAX_QUEUE_SIZE) {
log.warn("event queue overflow, closing stream", { size: q.size })
stop()
return
}
q.push(payload)
}

// Send heartbeat every 10s to prevent stalled proxy streams.
const heartbeat = setInterval(() => {
q.push(
enqueue(
JSON.stringify({
type: "server.heartbeat",
properties: {},
Expand All @@ -62,12 +75,12 @@ export const EventRoutes = () =>
done = true
clearInterval(heartbeat)
unsub()
q.push(null)
q.close(null)
log.info("event disconnected")
}

const unsub = Bus.subscribeAll((event) => {
q.push(JSON.stringify(event))
enqueue(JSON.stringify(event))
if (event.type === Bus.InstanceDisposed.type) {
stop()
}
Expand All @@ -78,7 +91,10 @@ export const EventRoutes = () =>
try {
for await (const data of q) {
if (data === null) return
await stream.writeSSE({ data })
await writeSSEWithTimeout(stream, data, WRITE_TIMEOUT_MS).catch((err) => {
log.info("event write failed, closing stream", { error: String(err) })
stop()
})
}
} finally {
stop()
Expand Down
22 changes: 22 additions & 0 deletions packages/opencode/src/util/queue.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,37 @@
export class AsyncQueue<T> implements AsyncIterable<T> {
private queue: T[] = []
private resolvers: ((value: T) => void)[] = []
private closed = false
private closeSentinel?: T

get size() {
return this.queue.length
}

push(item: T) {
if (this.closed) return
const resolve = this.resolvers.shift()
if (resolve) resolve(item)
else this.queue.push(item)
}

/**
* Close the queue. Drops any buffered items, resolves any pending consumer
* immediately with `sentinel`, and ignores further pushes. Used to unblock
* an iterator from the producer side (e.g. on disconnect) without having
* to wait for the consumer to drain the backlog.
*/
close(sentinel: T) {
if (this.closed) return
this.closed = true
this.closeSentinel = sentinel
this.queue.length = 0
while (this.resolvers.length > 0) this.resolvers.shift()!(sentinel)
}

async next(): Promise<T> {
if (this.queue.length > 0) return this.queue.shift()!
if (this.closed) return this.closeSentinel as T
return new Promise((resolve) => this.resolvers.push(resolve))
}

Expand Down
31 changes: 31 additions & 0 deletions packages/opencode/src/util/sse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Upper bound on the SSE producer queue. When exceeded, the stream is assumed
* to be wedged (e.g. TCP in CLOSE_WAIT where Hono's `onAbort` never fires) and
* the caller is expected to close it. Sized to allow bursty traffic while
* still catching genuine stalls within seconds.
*/
export const MAX_QUEUE_SIZE = 10_000

/**
* Max time an individual `writeSSE` is allowed to sit pending before the
* stream is considered dead. On a healthy connection each write completes in
* milliseconds; this exists to break out of half-closed sockets where the
* write neither resolves nor rejects.
*/
export const WRITE_TIMEOUT_MS = 30_000

type SSEStream = {
writeSSE: (input: { data: string }) => Promise<void>
}

export function writeSSEWithTimeout(stream: SSEStream, data: string, ms: number) {
let timer: ReturnType<typeof setTimeout> | undefined
return Promise.race([
stream.writeSSE({ data }),
new Promise<never>((_, reject) => {
timer = setTimeout(() => reject(new Error("sse write timeout")), ms)
}),
]).finally(() => {
if (timer) clearTimeout(timer)
})
}
103 changes: 103 additions & 0 deletions packages/opencode/test/util/queue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { describe, expect, test } from "bun:test"
import { AsyncQueue } from "../../src/util/queue"

describe("util.queue", () => {
test("buffers items when nothing is consuming", () => {
const q = new AsyncQueue<number>()
q.push(1)
q.push(2)
q.push(3)
expect(q.size).toBe(3)
})

test("delivers pushed items to a waiting consumer", async () => {
const q = new AsyncQueue<number>()
const waiter = q.next()
q.push(42)
expect(await waiter).toBe(42)
expect(q.size).toBe(0)
})

test("preserves FIFO order", async () => {
const q = new AsyncQueue<number>()
q.push(1)
q.push(2)
q.push(3)
expect(await q.next()).toBe(1)
expect(await q.next()).toBe(2)
expect(await q.next()).toBe(3)
})

test("iterates pushed items as an async iterable until closed", async () => {
const q = new AsyncQueue<number | null>()
q.push(1)
q.push(2)
q.push(3)

const seen: number[] = []
const consumer = (async () => {
for await (const item of q) {
if (item === null) break
seen.push(item)
}
})()
// Wait until the buffered items have been drained, then close to unblock
// the iterator that is now awaiting the next item.
while (q.size > 0) await Promise.resolve()
q.close(null)
await consumer
expect(seen).toEqual([1, 2, 3])
})

test("close() drops buffered items and unblocks waiters", async () => {
const q = new AsyncQueue<string | null>()
q.push("a")
q.push("b")
expect(q.size).toBe(2)

// Attach a waiter that is already past the buffer.
q.push("c")
const consumed: (string | null)[] = []
consumed.push(await q.next()) // "a"
consumed.push(await q.next()) // "b"
consumed.push(await q.next()) // "c"

// Now nothing buffered, a waiter is pending.
const pending = q.next()
q.close(null)
expect(await pending).toBeNull()
expect(q.size).toBe(0)
})

test("close() ignores subsequent pushes", async () => {
const q = new AsyncQueue<number | null>()
q.close(null)
q.push(1)
q.push(2)
expect(q.size).toBe(0)
expect(await q.next()).toBeNull()
})

test("close() is idempotent", async () => {
const q = new AsyncQueue<number | null>()
q.push(1)
q.close(null)
q.close(null)
// Still returns the close sentinel because buffer was dropped on first close.
expect(await q.next()).toBeNull()
})

test("close() drops buffer so iterator exits immediately", async () => {
const q = new AsyncQueue<string | null>()
// Simulate a stuck consumer: fill the queue with many items before close.
for (let i = 0; i < 100; i++) q.push("item-" + i)
q.close(null)
// Despite 100 buffered items, iteration should terminate immediately.
const seen: string[] = []
for await (const item of q) {
if (item === null) break
seen.push(item)
}
expect(seen).toEqual([])
})
})
53 changes: 53 additions & 0 deletions packages/opencode/test/util/sse.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { describe, expect, test } from "bun:test"
import { writeSSEWithTimeout } from "../../src/util/sse"

describe("util.sse", () => {
test("resolves when writeSSE resolves in time", async () => {
const calls: string[] = []
const stream = {
writeSSE: async (input: { data: string }) => {
calls.push(input.data)
},
}
await writeSSEWithTimeout(stream, "hello", 1000)
expect(calls).toEqual(["hello"])
})

test("rejects with timeout error when writeSSE hangs", async () => {
const stream = {
writeSSE: () => new Promise<void>(() => {}),
}
const err = await writeSSEWithTimeout(stream, "hello", 50).then(
() => null,
(e: Error) => e,
)
expect(err).toBeInstanceOf(Error)
expect(err?.message).toBe("sse write timeout")
})

test("clears the timer when the write resolves first", async () => {
// If the timer were not cleared it would keep the event loop alive and
// eventually reject. We verify by waiting past the nominal timeout and
// confirming the promise has already settled with success.
const stream = {
writeSSE: async () => {},
}
const p = writeSSEWithTimeout(stream, "hello", 20)
await p
// Wait past the timeout window to make sure the timer didn't fire later.
await new Promise((r) => setTimeout(r, 40))
})

test("surfaces the underlying writeSSE error", async () => {
const stream = {
writeSSE: async () => {
throw new Error("boom")
},
}
const err = await writeSSEWithTimeout(stream, "hello", 1000).then(
() => null,
(e: Error) => e,
)
expect(err?.message).toBe("boom")
})
})
Loading