diff --git a/bun.lock b/bun.lock index 9d31221..589474a 100644 --- a/bun.lock +++ b/bun.lock @@ -80,6 +80,7 @@ }, "devDependencies": { "@ageflow/mcp-server": "workspace:*", + "@ageflow/server-sqlite": "workspace:*", "@modelcontextprotocol/sdk": "^1.0.0", "@types/node": "^22.0.0", "vitest": "^2.1.0", @@ -103,7 +104,7 @@ }, "packages/cli": { "name": "@ageflow/cli", - "version": "0.5.5", + "version": "0.6.0", "bin": { "agentwf": "./dist/bin.js", }, @@ -112,7 +113,9 @@ "@ageflow/executor": "^0.7.0", "@ageflow/learning": "^0.5.0", "@ageflow/learning-sqlite": "^0.4.1", - "@ageflow/mcp-server": "^0.5.0", + "@ageflow/mcp-server": "^0.7.0", + "@ageflow/server": "^0.6.0", + "@ageflow/server-sqlite": "^0.2.0", "boxen": "^8.0.0", "chalk": "^5.3.0", "commander": "^12.0.0", @@ -138,7 +141,7 @@ }, "packages/dev-workflow": { "name": "@ageflow/dev-workflow", - "version": "0.0.15", + "version": "0.0.16", "dependencies": { "@ageflow/core": "^0.6.0", "@ageflow/executor": "^0.7.0", @@ -203,16 +206,18 @@ }, "packages/mcp-server": { "name": "@ageflow/mcp-server", - "version": "0.5.1", + "version": "0.7.0", "dependencies": { "@ageflow/core": "^0.6.0", "@ageflow/executor": "^0.7.0", - "@ageflow/server": "^0.4.4", + "@ageflow/server": "^0.6.0", + "@ageflow/server-sqlite": "^0.2.0", "@modelcontextprotocol/sdk": "^1.0.0", "zod-to-json-schema": "^3.23.0", }, "devDependencies": { "@ageflow/testing": "workspace:*", + "@types/bun": "^1.3.12", "@types/node": "^22.0.0", "vitest": "^2.1.0", "zod": "^3.23.0", @@ -272,7 +277,7 @@ }, "packages/server": { "name": "@ageflow/server", - "version": "0.4.5", + "version": "0.6.0", "dependencies": { "@ageflow/core": "^0.6.0", "@ageflow/executor": "^0.7.0", @@ -283,6 +288,18 @@ "zod": "^3.23.0", }, }, + "packages/server-sqlite": { + "name": "@ageflow/server-sqlite", + "version": "0.2.0", + "dependencies": { + "@ageflow/server": "^0.6.0", + }, + "devDependencies": { + "@types/bun": "^1.3.12", + "@types/node": "^22.0.0", + "vitest": "^2.1.0", + }, + }, "packages/testing": { "name": "@ageflow/testing", "version": "0.3.8", @@ -338,6 +355,8 @@ "@ageflow/server": ["@ageflow/server@workspace:packages/server"], + "@ageflow/server-sqlite": ["@ageflow/server-sqlite@workspace:packages/server-sqlite"], + "@ageflow/testing": ["@ageflow/testing@workspace:packages/testing"], "@biomejs/biome": ["@biomejs/biome@1.9.4", "", { "optionalDependencies": { "@biomejs/cli-darwin-arm64": "1.9.4", "@biomejs/cli-darwin-x64": "1.9.4", "@biomejs/cli-linux-arm64": "1.9.4", "@biomejs/cli-linux-arm64-musl": "1.9.4", "@biomejs/cli-linux-x64": "1.9.4", "@biomejs/cli-linux-x64-musl": "1.9.4", "@biomejs/cli-win32-arm64": "1.9.4", "@biomejs/cli-win32-x64": "1.9.4" }, "bin": { "biome": "bin/biome" } }, "sha512-1rkd7G70+o9KkTn5KLmDYXihGoTaIGO9PIIN2ZB7UJxFrWw04CZHPYiMRjYsaDvVV7hP1dYNRLxSANLaBFGpog=="], diff --git a/docs/superpowers/specs/2026-04-16-mcp-async-design.md b/docs/superpowers/specs/2026-04-16-mcp-async-design.md index e380c3e..ccf6a20 100644 --- a/docs/superpowers/specs/2026-04-16-mcp-async-design.md +++ b/docs/superpowers/specs/2026-04-16-mcp-async-design.md @@ -47,15 +47,16 @@ decides how to reuse them and what the MCP tool surface looks like. progress/elicitation as today. In job mode, elicitation surfaces via `get_workflow_status.currentTask` — no out-of-band prompt because the originating client may be gone. -- **Job registry is in-process.** Matches `@ageflow/server` today. An - interface hook (`RunStore`) is sketched for future persistent backends. +- **Job registry defaults to in-process, with optional persistence.** + By default jobs use an in-memory store. With `--job-db ` the + registry persists snapshots to SQLite and hydrates known jobs on startup. ## Non-goals - **No distributed jobs.** `jobId` is valid only on the server that created it. Horizontal scale requires sticky routing — out of scope. -- **No persistence across restarts.** Jobs live in memory; restart drops - them. Durable jobs → future work (see "Future: RunStore"). +- **No distributed persistence / replication.** Durable snapshots are local + to a single server instance (for example SQLite on local disk). - **No job prioritization / queueing.** Single-run `BUSY` lock preserved (§5). First caller wins; second caller gets `BUSY`. - **No new HITL mechanism.** Existing `hitl-bridge`; only **surfacing** @@ -405,7 +406,8 @@ input type is structurally identical to the sync tool's input type Restated for emphasis, since issue #18 is deliberately narrow: - **No distributed jobs.** Jobs are single-instance only. -- **No persistence across server restart.** In-memory `RunRegistry` only. +- **No distributed persistence across server restart.** Restart recovery is + supported only when a durable local `RunStore` backend is configured. - **No job prioritization / queueing.** Single `BUSY` lock — same policy as sync mode. - **No `list_jobs` / `wait_for_job` bulk APIs.** v2 if ever requested. @@ -430,8 +432,8 @@ Restated for emphasis, since issue #18 is deliberately narrow: ## Open follow-ups / future work -- **`RunStore` persistence.** File-backed or SQLite-backed - `RunRegistry` for jobs that survive server restart. +- **Additional durable backends.** Add Redis/Postgres-grade `RunStore` + adapters where restart recovery must survive host replacement. - **Webhook / push notifications.** Optional "call me at URL X when job finishes" so polling isn't required for clients that can accept callbacks. diff --git a/docs/superpowers/specs/2026-04-16-server-execution-design.md b/docs/superpowers/specs/2026-04-16-server-execution-design.md index b9bf2f9..0a81dd0 100644 --- a/docs/superpowers/specs/2026-04-16-server-execution-design.md +++ b/docs/superpowers/specs/2026-04-16-server-execution-design.md @@ -44,8 +44,9 @@ transports are all re-expressed on top of it. - **No bundled HTTP server.** We do not ship Express / Fastify middleware. Framework integrations live in userland or future packages. -- **No persistence.** Runs live in process memory. Restarting the server - drops in-flight runs. Durable runs are a v0.2+ feature. +- **In-memory by default; persistence is pluggable.** Without a `RunStore`, + runs live in process memory. With a durable `RunStore` backend, run + snapshots can survive restart. - **No distributed execution.** A `runId` is only valid on the instance that created it. Horizontal scale requires sticky sessions or external state, out of scope here. @@ -146,7 +147,8 @@ behavior unchanged). So CLI keeps prompting on TTY exactly like today. ### Run registry -`@ageflow/server` owns a `RunRegistry` — an in-memory `Map`. +`@ageflow/server` owns a `RunRegistry` (active handles) and can mirror +run snapshots through a pluggable `RunStore` backend. ```ts interface RunHandle { diff --git a/examples/mcp-server/package.json b/examples/mcp-server/package.json index 33cd33a..2f403b7 100644 --- a/examples/mcp-server/package.json +++ b/examples/mcp-server/package.json @@ -17,6 +17,7 @@ }, "devDependencies": { "@ageflow/mcp-server": "workspace:*", + "@ageflow/server-sqlite": "workspace:*", "@modelcontextprotocol/sdk": "^1.0.0", "@types/node": "^22.0.0", "vitest": "^2.1.0", diff --git a/packages/cli/package.json b/packages/cli/package.json index 478c777..ec98ca9 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "@ageflow/cli", - "version": "0.5.5", + "version": "0.6.0", "description": "CLI for ageflow \u2014 agentwf run / validate / dry-run / init", "homepage": "https://github.com/Neftedollar/ageflow/tree/master/packages/cli", "type": "module", @@ -26,7 +26,9 @@ "@ageflow/executor": "^0.7.0", "@ageflow/learning": "^0.5.0", "@ageflow/learning-sqlite": "^0.4.1", - "@ageflow/mcp-server": "^0.5.0", + "@ageflow/mcp-server": "^0.7.0", + "@ageflow/server": "^0.6.0", + "@ageflow/server-sqlite": "^0.2.0", "chalk": "^5.3.0", "ora": "^8.0.0", "boxen": "^8.0.0", diff --git a/packages/cli/src/__tests__/mcp-serve.test.ts b/packages/cli/src/__tests__/mcp-serve.test.ts index 2ebff3b..52b5486 100644 --- a/packages/cli/src/__tests__/mcp-serve.test.ts +++ b/packages/cli/src/__tests__/mcp-serve.test.ts @@ -216,6 +216,16 @@ describe("parseMcpServeArgs: async mode flags (#18)", () => { expect(parsed.jobCheckpointTtlMs).toBe(900_000); }); + it("parses --job-db ", () => { + const parsed = parseMcpServeArgs([ + "wf.ts", + "--async", + "--job-db", + "/tmp/jobs.sqlite", + ]); + expect(parsed.jobDb).toBe("/tmp/jobs.sqlite"); + }); + it("rejects --job-ttl with no value", () => { expect(() => parseMcpServeArgs(["wf.ts", "--async", "--job-ttl"])).toThrow( /requires/, @@ -239,4 +249,10 @@ describe("parseMcpServeArgs: async mode flags (#18)", () => { parseMcpServeArgs(["wf.ts", "--checkpoint-ttl", "1000"]), ).toThrow(/requires --async/); }); + + it("rejects --job-db without --async", () => { + expect(() => + parseMcpServeArgs(["wf.ts", "--job-db", "/tmp/jobs.sqlite"]), + ).toThrow(/requires --async/); + }); }); diff --git a/packages/cli/src/commands/mcp-serve.ts b/packages/cli/src/commands/mcp-serve.ts index fbeb5d4..04389f2 100644 --- a/packages/cli/src/commands/mcp-serve.ts +++ b/packages/cli/src/commands/mcp-serve.ts @@ -48,6 +48,8 @@ export interface McpServeArgs { readonly jobTtlMs?: number; /** Override default 1-hour checkpoint TTL in ms (--checkpoint-ttl ). */ readonly jobCheckpointTtlMs?: number; + /** Persist async job registry to a SQLite database (--job-db ). */ + readonly jobDb?: string; /** Use Streamable HTTP transport instead of stdio (--http). */ readonly http?: boolean; /** HTTP port (--port , required with --http). */ @@ -90,6 +92,7 @@ export function parseMcpServeArgs(argv: readonly string[]): McpServeArgs { let asyncMode: boolean | undefined = undefined; let jobTtlMs: number | undefined = undefined; let jobCheckpointTtlMs: number | undefined = undefined; + let jobDb: string | undefined = undefined; let httpMode: boolean | undefined = undefined; let httpPort: number | undefined = undefined; let httpHost: string | undefined = undefined; @@ -215,6 +218,15 @@ export function parseMcpServeArgs(argv: readonly string[]): McpServeArgs { break; } + case "--job-db": { + const val = args[++i]; + if (val === undefined || val.startsWith("-")) { + throw new Error("--job-db requires a path argument"); + } + jobDb = val; + break; + } + case "--http": httpMode = true; break; @@ -259,9 +271,11 @@ export function parseMcpServeArgs(argv: readonly string[]): McpServeArgs { if ( asyncMode !== true && - (jobTtlMs !== undefined || jobCheckpointTtlMs !== undefined) + (jobTtlMs !== undefined || + jobCheckpointTtlMs !== undefined || + jobDb !== undefined) ) { - throw new Error("--job-ttl / --checkpoint-ttl requires --async"); + throw new Error("--job-ttl / --checkpoint-ttl / --job-db requires --async"); } if (httpPort !== undefined && httpMode !== true) { @@ -302,6 +316,7 @@ export function parseMcpServeArgs(argv: readonly string[]): McpServeArgs { ...(asyncMode !== undefined ? { async: asyncMode } : {}), ...(jobTtlMs !== undefined ? { jobTtlMs } : {}), ...(jobCheckpointTtlMs !== undefined ? { jobCheckpointTtlMs } : {}), + ...(jobDb !== undefined ? { jobDb } : {}), ...(httpMode !== undefined ? { http: httpMode } : {}), ...(httpPort !== undefined ? { port: httpPort } : {}), ...(httpHost !== undefined ? { httpHost } : {}), @@ -381,6 +396,7 @@ async function runMcpServe(rawArgv: string[]): Promise { ...(parsed.jobCheckpointTtlMs !== undefined ? { jobCheckpointTtlMs: parsed.jobCheckpointTtlMs } : {}), + ...(parsed.jobDb !== undefined ? { jobDbPath: parsed.jobDb } : {}), }); if (parsed.http === true) { @@ -464,6 +480,7 @@ export function registerMcpCommand(program: Command): void { " --async enable async job mode (5 extra tools)\n" + " --job-ttl job TTL in ms (default: 1800000, requires --async)\n" + " --checkpoint-ttl checkpoint TTL in ms (default: 3600000, requires --async)\n" + + " --job-db persist async job registry to SQLite (requires --async)\n" + " --http use Streamable HTTP transport instead of stdio\n" + " --port HTTP port (required with --http)\n" + " --host HTTP bind address (default: 127.0.0.1, requires --http)\n" + diff --git a/packages/dev-workflow/pipelines/release.ts b/packages/dev-workflow/pipelines/release.ts index a6aab51..41bb31a 100644 --- a/packages/dev-workflow/pipelines/release.ts +++ b/packages/dev-workflow/pipelines/release.ts @@ -23,6 +23,7 @@ const PUBLISH_ORDER = [ "@ageflow/runner-anthropic", "@ageflow/testing", "@ageflow/server", + "@ageflow/server-sqlite", "@ageflow/mcp-server", "@ageflow/learning", "@ageflow/learning-sqlite", diff --git a/packages/mcp-server/README.md b/packages/mcp-server/README.md index fc3c567..ed57f88 100644 --- a/packages/mcp-server/README.md +++ b/packages/mcp-server/README.md @@ -89,8 +89,9 @@ agentwf mcp serve ./workflow.ts --async --checkpoint-ttl 7200000 ### Known limitations -- **No persistence** — the job registry is in-memory only. Restarting the - server loses all job state. +- **Durability is opt-in** — by default the job registry is in-memory. + Use `--job-db ` to persist async job snapshots to SQLite and + recover known jobs after restart. - **Single-instance** — the registry is not shared across processes. Running multiple server processes will have independent job stores. - **Single BUSY lock** — only one `start_*` call can be in-flight at a time. diff --git a/packages/mcp-server/package.json b/packages/mcp-server/package.json index 95ec2f2..5bb861e 100644 --- a/packages/mcp-server/package.json +++ b/packages/mcp-server/package.json @@ -1,6 +1,6 @@ { "name": "@ageflow/mcp-server", - "version": "0.5.1", + "version": "0.7.0", "description": "Expose ageflow workflows as MCP tools (stdio transport, progress streaming, HITL via elicitation).", "homepage": "https://github.com/Neftedollar/ageflow/tree/master/packages/mcp-server", "type": "module", @@ -18,12 +18,14 @@ "dependencies": { "@ageflow/core": "^0.6.0", "@ageflow/executor": "^0.7.0", - "@ageflow/server": "^0.4.4", + "@ageflow/server": "^0.6.0", + "@ageflow/server-sqlite": "^0.2.0", "@modelcontextprotocol/sdk": "^1.0.0", "zod-to-json-schema": "^3.23.0" }, "devDependencies": { "@ageflow/testing": "workspace:*", + "@types/bun": "^1.3.12", "@types/node": "^22.0.0", "vitest": "^2.1.0", "zod": "^3.23.0" diff --git a/packages/mcp-server/src/__tests__/integration/async-mode.persistence.test.ts b/packages/mcp-server/src/__tests__/integration/async-mode.persistence.test.ts new file mode 100644 index 0000000..f4370d6 --- /dev/null +++ b/packages/mcp-server/src/__tests__/integration/async-mode.persistence.test.ts @@ -0,0 +1,187 @@ +import { mkdtempSync, rmSync } from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { defineAgent, defineWorkflow } from "@ageflow/core"; +import { describe, expect, it } from "vitest"; +import { z } from "zod"; +import { createSingleWorkflowServer } from "../../server.js"; + +const completedAgent = defineAgent({ + runner: "completed-stub", + input: z.object({ q: z.string() }), + output: z.object({ a: z.string() }), + prompt: () => "p", +}); + +const completedWorkflow = defineWorkflow({ + name: "completed_workflow", + tasks: { t: { agent: completedAgent, input: { q: "seed" } } }, +}); + +const checkpointAgent = defineAgent({ + runner: "checkpoint-stub", + input: z.object({ q: z.string() }), + output: z.object({ a: z.string() }), + prompt: () => "p", + hitl: { mode: "checkpoint", message: "approve?" }, +}); + +const checkpointWorkflow = defineWorkflow({ + name: "checkpoint_workflow", + tasks: { t: { agent: checkpointAgent, input: { q: "seed" } } }, +}); + +function makeDbPath(prefix: string): { readonly dir: string; readonly dbPath: string } { + const dir = mkdtempSync(path.join(os.tmpdir(), `ageflow-${prefix}-`)); + return { dir, dbPath: path.join(dir, "jobs.sqlite") }; +} + +async function waitForState( + server: ReturnType, + state: string, + jobId: string, +): Promise { + for (let i = 0; i < 100; i += 1) { + const res = await server.callTool("get_workflow_status", { jobId }); + if (!res.isError) { + const current = res.structuredContent as { state?: string }; + if (current.state === state) return; + } + await new Promise((resolve) => setTimeout(resolve, 10)); + } + throw new Error(`timed out waiting for state=${state}`); +} + +describe("async mode persistence", () => { + it("recovers a completed job after restart", async () => { + const { dir, dbPath } = makeDbPath("completed"); + const server1 = createSingleWorkflowServer({ + workflow: completedWorkflow, + cliCeilings: {}, + hitlStrategy: "auto", + async: true, + jobDbPath: dbPath, + }); + server1._testRunExecutor = async (args) => { + const input = args as { q: string }; + return { a: `done:${input.q}` }; + }; + + try { + const start = await server1.callTool("start_completed_workflow", { + q: "persist", + }); + expect(start.isError).toBe(false); + if (start.isError) throw new Error("start failed"); + const jobId = (start.structuredContent as { jobId: string }).jobId; + + await waitForState(server1, "done", jobId); + server1.dispose?.(); + + const server2 = createSingleWorkflowServer({ + workflow: completedWorkflow, + cliCeilings: {}, + hitlStrategy: "auto", + async: true, + jobDbPath: dbPath, + }); + try { + const status = await server2.callTool("get_workflow_status", { jobId }); + expect(status.isError).toBe(false); + if (!status.isError) { + expect(status.structuredContent).toMatchObject({ state: "done" }); + } + + const result = await server2.callTool("get_workflow_result", { jobId }); + expect(result.isError).toBe(false); + if (!result.isError) { + expect(result.structuredContent).toMatchObject({ + state: "done", + output: { a: "done:persist" }, + }); + } + } finally { + server2.dispose?.(); + } + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + it("recovers an awaiting-checkpoint job after restart", async () => { + const { dir, dbPath } = makeDbPath("checkpoint"); + const server1 = createSingleWorkflowServer({ + workflow: checkpointWorkflow, + cliCeilings: {}, + hitlStrategy: "elicit", + async: true, + jobDbPath: dbPath, + }); + server1._testRunExecutor = async (args) => { + const input = args as { q: string }; + return { a: `done:${input.q}` }; + }; + + try { + const start = await server1.callTool("start_checkpoint_workflow", { + q: "pause", + }); + expect(start.isError).toBe(false); + if (start.isError) throw new Error("start failed"); + const jobId = (start.structuredContent as { jobId: string }).jobId; + + await waitForState(server1, "awaiting-checkpoint", jobId); + server1.dispose?.(); + + const server2 = createSingleWorkflowServer({ + workflow: checkpointWorkflow, + cliCeilings: {}, + hitlStrategy: "elicit", + async: true, + jobDbPath: dbPath, + }); + try { + const status = await server2.callTool("get_workflow_status", { jobId }); + expect(status.isError).toBe(false); + if (!status.isError) { + expect(status.structuredContent).toMatchObject({ + state: "awaiting-checkpoint", + }); + } + + const result = await server2.callTool("get_workflow_result", { jobId }); + expect(result.isError).toBe(false); + if (!result.isError) { + expect(result.structuredContent).toMatchObject({ pending: true }); + } + + let resume = await server2.callTool("resume_workflow", { + jobId, + approved: true, + }); + for (let i = 0; i < 20 && resume.isError; i += 1) { + await new Promise((resolve) => setTimeout(resolve, 10)); + resume = await server2.callTool("resume_workflow", { + jobId, + approved: true, + }); + } + expect(resume.isError).toBe(false); + + await waitForState(server2, "done", jobId); + const done = await server2.callTool("get_workflow_result", { jobId }); + expect(done.isError).toBe(false); + if (!done.isError) { + expect(done.structuredContent).toMatchObject({ + state: "done", + output: { a: "done:pause" }, + }); + } + } finally { + server2.dispose?.(); + } + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); +}); diff --git a/packages/mcp-server/src/__tests__/integration/async-mode.test.ts b/packages/mcp-server/src/__tests__/integration/async-mode.test.ts index d53e88d..e1137af 100644 --- a/packages/mcp-server/src/__tests__/integration/async-mode.test.ts +++ b/packages/mcp-server/src/__tests__/integration/async-mode.test.ts @@ -1,3 +1,6 @@ +import { mkdtemp, rm } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; import { defineAgent, defineWorkflow, @@ -9,12 +12,89 @@ import type { RunnerSpawnResult, WorkflowDef, } from "@ageflow/core"; -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import { z } from "zod"; import { ErrorCode } from "../../errors.js"; +import { createSqliteJobStore } from "../../index.js"; import type { McpToolResult } from "../../server.js"; import { ASYNC_OBSERVER_TOOL_NAMES, createMcpServer } from "../../server.js"; +type SqliteRow = { + readonly payload: string; + readonly runId: string; + readonly state: string; + readonly lastEventAt: number; +}; + +const sqliteStores = new Map>(); + +vi.mock("bun:sqlite", () => { + class Database { + constructor(private readonly dbPath: string) {} + + query(sql: string) { + const normalized = sql.trim().replace(/\s+/g, " "); + const getStore = () => { + let store = sqliteStores.get(this.dbPath); + if (!store) { + store = new Map(); + sqliteStores.set(this.dbPath, store); + } + return store; + }; + + return { + run: (params?: Record) => { + const store = getStore(); + if (normalized.startsWith("INSERT INTO jobs")) { + const row: SqliteRow = { + runId: String(params?.$runId), + state: String(params?.$state), + lastEventAt: Number(params?.$lastEventAt), + payload: String(params?.$payload), + }; + store.set(row.runId, row); + } else if ( + normalized.startsWith("DELETE FROM jobs WHERE runId = $runId") + ) { + store.delete(String(params?.$runId)); + } + return { changes: 1 }; + }, + get: (params?: Record) => { + const store = getStore(); + if (normalized.startsWith("SELECT payload FROM jobs WHERE runId")) { + return store.get(String(params?.$runId)); + } + return undefined; + }, + all: () => { + const store = getStore(); + if (normalized.startsWith("SELECT payload FROM jobs ORDER BY")) { + return [...store.values()] + .sort((a, b) => b.lastEventAt - a.lastEventAt) + .map((row) => ({ payload: row.payload })); + } + if ( + normalized.startsWith("SELECT runId, state, lastEventAt FROM jobs") + ) { + return [...store.values()].map((row) => ({ + runId: row.runId, + state: row.state, + lastEventAt: row.lastEventAt, + })); + } + return []; + }, + }; + } + + close() {} + } + + return { Database }; +}); + const agent = defineAgent({ runner: "fake", input: z.object({ q: z.string() }), @@ -27,6 +107,17 @@ const workflow = defineWorkflow({ tasks: { t: { agent, input: { q: "hi" } } }, }); +async function makeJobDbPath(prefix: string): Promise<{ + readonly dir: string; + readonly dbPath: string; +}> { + const dir = await mkdtemp(path.join(os.tmpdir(), `ageflow-${prefix}-`)); + return { + dir, + dbPath: path.join(dir, "jobs.sqlite"), + }; +} + describe("async mode: listTools (#18)", () => { it("returns 1 tool when async is omitted (default)", async () => { const h = createMcpServer({ @@ -123,6 +214,10 @@ describe("async mode: inflight lock (#18)", () => { }); const first = h.callTool("start_ask", { q: "1" }); + for (let i = 0; i < 50 && typeof release !== "function"; i += 1) { + await new Promise((resolve) => setTimeout(resolve, 0)); + } + expect(typeof release).toBe("function"); const second = await h.callTool("start_ask", { q: "2" }); expect(second.isError).toBe(true); if (second.isError) expect(second.structuredContent.errorCode).toBe("BUSY"); @@ -216,6 +311,219 @@ describe("async mode: cancel_workflow (#18)", () => { }); }); +describe("async mode: sqlite job store recovery", () => { + it("recovers a completed job after recreating the server", async () => { + const { dir, dbPath } = await makeJobDbPath("recovery"); + const jobStore1 = await createSqliteJobStore(dbPath); + const server1 = createMcpServer({ + workflow, + cliCeilings: {}, + hitlStrategy: "auto", + async: true, + jobStore: jobStore1, + }); + server1._testRunExecutor = async (args) => { + const input = args as { q: string }; + return { a: `done:${input.q}` }; + }; + + try { + const startRes = await server1.callTool("start_ask", { q: "persist" }); + expect(startRes.isError).toBe(false); + if (startRes.isError) throw new Error("start failed"); + + const jobId = (startRes.structuredContent as { jobId: string }).jobId; + let state = "running"; + for (let i = 0; i < 50 && state === "running"; i++) { + await new Promise((r) => setTimeout(r, 10)); + const statusRes = await server1.callTool("get_workflow_status", { + jobId, + }); + if (!statusRes.isError) { + state = (statusRes.structuredContent as { state: string }).state; + } + } + expect(state).toBe("done"); + + const result1 = await server1.callTool("get_workflow_result", { jobId }); + expect(result1.isError).toBe(false); + + server1.dispose?.(); + jobStore1.close(); + + const jobStore2 = await createSqliteJobStore(dbPath); + const server2 = createMcpServer({ + workflow, + cliCeilings: {}, + hitlStrategy: "auto", + async: true, + jobStore: jobStore2, + }); + + try { + const statusRes = await server2.callTool("get_workflow_status", { + jobId, + }); + expect(statusRes.isError).toBe(false); + if (!statusRes.isError) { + expect(statusRes.structuredContent).toMatchObject({ state: "done" }); + } + + const resultRes = await server2.callTool("get_workflow_result", { + jobId, + }); + expect(resultRes.isError).toBe(false); + if (!resultRes.isError) { + expect( + (resultRes.structuredContent as { output: { a: string } }).output.a, + ).toBe("done:persist"); + } + } finally { + server2.dispose?.(); + jobStore2.close(); + } + } finally { + await rm(dir, { recursive: true, force: true }); + } + }); + + it("keeps an in-flight job queryable after recreating the server", async () => { + const { dir, dbPath } = await makeJobDbPath("inflight"); + const jobStore1 = await createSqliteJobStore(dbPath); + const server1 = createMcpServer({ + workflow, + cliCeilings: {}, + hitlStrategy: "auto", + async: true, + jobStore: jobStore1, + }); + let release!: (value: { a: string }) => void; + server1._testRunExecutor = () => + new Promise<{ a: string }>((resolve) => { + release = resolve; + }); + + try { + const startRes = await server1.callTool("start_ask", { q: "hold" }); + expect(startRes.isError).toBe(false); + if (startRes.isError) throw new Error("start failed"); + const jobId = (startRes.structuredContent as { jobId: string }).jobId; + + const jobStore2 = await createSqliteJobStore(dbPath); + const server2 = createMcpServer({ + workflow, + cliCeilings: {}, + hitlStrategy: "auto", + async: true, + jobStore: jobStore2, + }); + + try { + const statusRes = await server2.callTool("get_workflow_status", { + jobId, + }); + expect(statusRes.isError).toBe(false); + if (!statusRes.isError) { + expect(statusRes.structuredContent).toMatchObject({ + state: "running", + }); + } + + const resultRes = await server2.callTool("get_workflow_result", { + jobId, + }); + expect(resultRes.isError).toBe(false); + if (!resultRes.isError) { + expect(resultRes.structuredContent).toMatchObject({ pending: true }); + } + + release({ a: "finished" }); + await new Promise((r) => setTimeout(r, 0)); + } finally { + server2.dispose?.(); + jobStore2.close(); + } + } finally { + server1.dispose?.(); + jobStore1.close(); + await rm(dir, { recursive: true, force: true }); + } + }); + + it("reaps expired persistent rows after reopening the same database", async () => { + const { dir, dbPath } = await makeJobDbPath("ttl"); + const jobStore1 = await createSqliteJobStore(dbPath); + const server1 = createMcpServer({ + workflow, + cliCeilings: {}, + hitlStrategy: "auto", + async: true, + jobTtlMs: 50, + jobCheckpointTtlMs: 50, + jobStore: jobStore1, + }); + server1._testRunExecutor = async () => ({ a: "ttl" }); + + try { + const startRes = await server1.callTool("start_ask", { q: "expire" }); + expect(startRes.isError).toBe(false); + if (startRes.isError) throw new Error("start failed"); + + const jobId = (startRes.structuredContent as { jobId: string }).jobId; + let state = "running"; + for (let i = 0; i < 50 && state === "running"; i++) { + await new Promise((r) => setTimeout(r, 10)); + const statusRes = await server1.callTool("get_workflow_status", { + jobId, + }); + if (!statusRes.isError) { + state = (statusRes.structuredContent as { state: string }).state; + } + } + expect(state).toBe("done"); + + server1.dispose?.(); + jobStore1.close(); + + const jobStore2 = await createSqliteJobStore(dbPath); + const server2 = createMcpServer({ + workflow, + cliCeilings: {}, + hitlStrategy: "auto", + async: true, + jobTtlMs: 50, + jobCheckpointTtlMs: 50, + jobStore: jobStore2, + }); + + try { + await new Promise((r) => setTimeout(r, 200)); + + const statusRes = await server2.callTool("get_workflow_status", { + jobId, + }); + expect(statusRes.isError).toBe(true); + if (statusRes.isError) { + expect(statusRes.structuredContent.errorCode).toBe("JOB_NOT_FOUND"); + } + + const resultRes = await server2.callTool("get_workflow_result", { + jobId, + }); + expect(resultRes.isError).toBe(true); + if (resultRes.isError) { + expect(resultRes.structuredContent.errorCode).toBe("JOB_NOT_FOUND"); + } + } finally { + server2.dispose?.(); + jobStore2.close(); + } + } finally { + await rm(dir, { recursive: true, force: true }); + } + }); +}); + describe("async mode: JOB_NOT_FOUND (#18)", () => { it.each([ "get_workflow_status", diff --git a/packages/mcp-server/src/index.ts b/packages/mcp-server/src/index.ts index 229cc4a..b828e2d 100644 --- a/packages/mcp-server/src/index.ts +++ b/packages/mcp-server/src/index.ts @@ -29,6 +29,12 @@ export type { McpToolResult, RunWorkflowFn, } from "./server.js"; +export { createSqliteJobStore } from "./job-store-sqlite.js"; +export { + InMemoryJobStore, + type JobStore, + type PersistedJob, +} from "./job-store.js"; export { startStdioTransport } from "./stdio-transport.js"; export type { StdioTransportOptions } from "./stdio-transport.js"; export type { CliCeilings, EffectiveCeilings, HitlStrategy } from "./types.js"; diff --git a/packages/mcp-server/src/job-dispatch.ts b/packages/mcp-server/src/job-dispatch.ts index 55d5777..a12bfb6 100644 --- a/packages/mcp-server/src/job-dispatch.ts +++ b/packages/mcp-server/src/job-dispatch.ts @@ -1,13 +1,15 @@ import type { + RunHandle, RunnerSpawnArgs, RunnerSpawnResult, WorkflowDef, WorkflowEvent, } from "@ageflow/core"; import { registerRunner, unregisterRunner } from "@ageflow/core"; -import { type Runner, createRunner } from "@ageflow/server"; +import { type RunStore, type Runner, createRunner } from "@ageflow/server"; import { ErrorCode, McpServerError, formatErrorResult } from "./errors.js"; import { JobEventRecorder } from "./job-event-recorder.js"; +import { type PersistedJob, isExpired, toPersistedJob } from "./job-store.js"; import type { McpToolResult } from "./server.js"; import type { ToolDefinition } from "./tool-registry.js"; import type { EffectiveCeilings } from "./types.js"; @@ -15,9 +17,12 @@ import type { EffectiveCeilings } from "./types.js"; export interface JobDispatchContext { readonly runner: Runner; readonly recorder: JobEventRecorder; + readonly store: RunStore; readonly workflow: WorkflowDef; readonly tool: ToolDefinition; // sync tool (for output validation) readonly jobTools: readonly ToolDefinition[]; // full job-tool array + readonly jobTtlMs: number; + readonly jobCheckpointTtlMs: number; /** Executor injection hook from tests (mirrors sync path's _testRunExecutor). */ testRunExecutor?: (input: unknown) => Promise; /** @@ -32,6 +37,60 @@ export interface JobDispatchContext { readonly releaseInflight: () => void; } +function sweepStore(ctx: JobDispatchContext): void { + const now = Date.now(); + for (const snapshot of ctx.store.list()) { + if (isExpired(snapshot, now, ctx.jobTtlMs, ctx.jobCheckpointTtlMs)) { + ctx.store.delete(snapshot.runId); + ctx.recorder.forget(snapshot.runId); + } + } +} + +function getLiveSnapshot( + ctx: JobDispatchContext, + jobId: string, +): RunHandle | undefined { + const handle = ctx.runner.get(jobId); + if (!handle) return undefined; + if (isExpired(handle, Date.now(), ctx.jobTtlMs, ctx.jobCheckpointTtlMs)) { + return undefined; + } + return handle; +} + +function persistSnapshot( + ctx: JobDispatchContext, + jobId: string, + snapshot?: RunHandle | PersistedJob, +): PersistedJob | undefined { + const live = snapshot ?? getLiveSnapshot(ctx, jobId); + const stored = ctx.store.get(jobId) as PersistedJob | undefined; + const base = live ?? stored; + if (!base) return undefined; + const previousProgress = + base !== undefined && "progress" in base + ? (base as PersistedJob).progress + : undefined; + const progress = + ctx.recorder.snapshot(jobId) ?? stored?.progress ?? previousProgress; + const persisted = toPersistedJob(base, progress); + ctx.store.upsert(persisted); + return persisted; +} + +function loadCurrentSnapshot( + ctx: JobDispatchContext, + jobId: string, +): PersistedJob | undefined { + sweepStore(ctx); + const live = getLiveSnapshot(ctx, jobId); + if (live) { + return persistSnapshot(ctx, jobId, live); + } + return ctx.store.get(jobId) as PersistedJob | undefined; +} + export interface DispatchStartOptions { /** Effective ceilings (from composeCeilings) to apply to workflow budget. */ readonly effective: EffectiveCeilings; @@ -82,6 +141,7 @@ export async function dispatchStart( // Build the composed workflow — apply ceiling overrides and HITL hooks. // This mirrors the setup done on the sync path (composeCeilings + buildMcpHooks) // so that both paths behave identically with respect to CLI ceilings and HITL. + sweepStore(ctx); const composedWorkflow = applyRunOpts( ctx.workflow, ctx.tool.inputTask, @@ -126,6 +186,7 @@ export async function dispatchStart( }; registerRunner(runnerName, fakeRunner); + let jobId = ""; const handle = ctx.runner.fire(composedWorkflow, parsed.data, { ...(runOpts.abortSignal !== undefined ? { signal: runOpts.abortSignal } @@ -140,15 +201,18 @@ export async function dispatchStart( onEvent: (ev: WorkflowEvent) => ctx.recorder.record(ev), onComplete: () => { unregisterRunner(runnerName); + persistSnapshot(ctx, jobId); ctx.releaseInflight(); }, onError: () => { unregisterRunner(runnerName); + persistSnapshot(ctx, jobId); ctx.releaseInflight(); }, }); - const jobId = handle.runId; + jobId = handle.runId; + persistSnapshot(ctx, jobId, handle); // Return the result immediately so the caller has the jobId. // completionPromise is available for tests that need to sync on completion. return { @@ -158,6 +222,7 @@ export async function dispatchStart( }; } + let jobId = ""; const handle = ctx.runner.fire(composedWorkflow, parsed.data, { // Wire the DurationWatchdog abort signal (if any) so the internal executor // honours the duration ceiling on the async path. @@ -177,16 +242,20 @@ export async function dispatchStart( : {}), onEvent: (ev: WorkflowEvent) => ctx.recorder.record(ev), onComplete: () => { + persistSnapshot(ctx, jobId); ctx.releaseInflight(); }, onError: () => { + persistSnapshot(ctx, jobId); ctx.releaseInflight(); }, }); + jobId = handle.runId; + persistSnapshot(ctx, jobId, handle); return { - content: [{ type: "text", text: JSON.stringify({ jobId: handle.runId }) }], - structuredContent: { jobId: handle.runId }, + content: [{ type: "text", text: JSON.stringify({ jobId }) }], + structuredContent: { jobId }, isError: false, }; } @@ -197,42 +266,44 @@ export function dispatchGetStatus( ): McpToolResult { const jobId = parseJobId(args); if (typeof jobId !== "string") return jobId; - const handle = ctx.runner.get(jobId); - if (!handle) { + const snapshot = loadCurrentSnapshot(ctx, jobId); + if (!snapshot) { return formatErrorResult( new McpServerError(ErrorCode.JOB_NOT_FOUND, `unknown jobId: ${jobId}`, { jobId, }), ); } - const snap = ctx.recorder.snapshot(jobId); const currentTask = - handle.state === "awaiting-checkpoint" && handle.pendingCheckpoint + snapshot.state === "awaiting-checkpoint" && snapshot.pendingCheckpoint ? { - name: handle.pendingCheckpoint.taskName, + name: snapshot.pendingCheckpoint.taskName, kind: "checkpoint" as const, - message: handle.pendingCheckpoint.message, + message: snapshot.pendingCheckpoint.message, } - : snap?.lastTaskStart - ? { name: snap.lastTaskStart.taskName, kind: "task" as const } + : snapshot.progress?.lastTaskStart + ? { + name: snapshot.progress.lastTaskStart.taskName, + kind: "task" as const, + } : undefined; - const progress = snap + const progress = snapshot.progress ? { - tasksCompleted: snap.tasksCompleted, + tasksCompleted: snapshot.progress.tasksCompleted, tasksTotal: ctx.taskCount, - ...(snap.lastBudgetWarning !== undefined + ...(snapshot.progress.lastBudgetWarning !== undefined ? { - spentUsd: snap.lastBudgetWarning.spentUsd, - limitUsd: snap.lastBudgetWarning.limitUsd, + spentUsd: snapshot.progress.lastBudgetWarning.spentUsd, + limitUsd: snapshot.progress.lastBudgetWarning.limitUsd, } : {}), } : undefined; const structured = { - state: handle.state, - createdAt: handle.createdAt, - lastEventAt: handle.lastEventAt, + state: snapshot.state, + createdAt: snapshot.createdAt, + lastEventAt: snapshot.lastEventAt, ...(currentTask !== undefined ? { currentTask } : {}), ...(progress !== undefined ? { progress } : {}), }; @@ -249,8 +320,8 @@ export function dispatchGetResult( ): McpToolResult { const jobId = parseJobId(args); if (typeof jobId !== "string") return jobId; - const handle = ctx.runner.get(jobId); - if (!handle) { + const snapshot = loadCurrentSnapshot(ctx, jobId); + if (!snapshot) { return formatErrorResult( new McpServerError(ErrorCode.JOB_NOT_FOUND, `unknown jobId: ${jobId}`, { jobId, @@ -258,14 +329,17 @@ export function dispatchGetResult( ); } - if (handle.state === "running" || handle.state === "awaiting-checkpoint") { + if ( + snapshot.state === "running" || + snapshot.state === "awaiting-checkpoint" + ) { return { content: [{ type: "text", text: JSON.stringify({ pending: true }) }], structuredContent: { pending: true }, isError: false, }; } - if (handle.state === "cancelled") { + if (snapshot.state === "cancelled") { return formatErrorResult( new McpServerError( ErrorCode.JOB_CANCELLED, @@ -274,17 +348,17 @@ export function dispatchGetResult( ), ); } - if (handle.state === "failed") { + if (snapshot.state === "failed") { return formatErrorResult( new McpServerError( ErrorCode.WORKFLOW_FAILED, - handle.error?.message ?? "workflow failed", - { jobId, error: handle.error }, + snapshot.error?.message ?? "workflow failed", + { jobId, error: snapshot.error }, ), ); } // done — re-validate the output task's result through the output Zod schema. - const raw = handle.result?.outputs[ctx.tool.outputTask]; + const raw = snapshot.result?.outputs[ctx.tool.outputTask]; const outputTaskDef = ctx.workflow.tasks[ctx.tool.outputTask] as { agent: { output: import("zod").ZodType }; }; @@ -301,7 +375,7 @@ export function dispatchGetResult( const structured = { state: "done" as const, output: parsedOutput.data, - metrics: handle.result?.metrics, + metrics: snapshot.result?.metrics, }; return { content: [{ type: "text", text: JSON.stringify(structured) }], @@ -316,7 +390,8 @@ export function dispatchCancel( ): McpToolResult { const jobId = parseJobId(args); if (typeof jobId !== "string") return jobId; - const handle = ctx.runner.get(jobId); + sweepStore(ctx); + const handle = getLiveSnapshot(ctx, jobId); if (!handle) { return formatErrorResult( new McpServerError(ErrorCode.JOB_NOT_FOUND, `unknown jobId: ${jobId}`, { @@ -338,6 +413,7 @@ export function dispatchCancel( } const priorState = handle.state; ctx.runner.cancel(jobId); + persistSnapshot(ctx, jobId); return { content: [ { @@ -356,11 +432,23 @@ export function dispatchResume( ): McpToolResult { const parsed = parseResumeArgs(args); if ("isError" in parsed) return parsed; + sweepStore(ctx); + const handle = getLiveSnapshot(ctx, parsed.jobId); + if (!handle) { + return formatErrorResult( + new McpServerError( + ErrorCode.JOB_NOT_FOUND, + `unknown jobId: ${parsed.jobId}`, + { jobId: parsed.jobId }, + ), + ); + } try { ctx.runner.resume(parsed.jobId, parsed.approved); } catch (err) { return formatErrorResult(err); } + persistSnapshot(ctx, parsed.jobId); return { content: [{ type: "text", text: JSON.stringify({ resumed: true }) }], structuredContent: { resumed: true }, @@ -411,6 +499,7 @@ export function createJobDispatchContext(args: { workflow: WorkflowDef; tool: ToolDefinition; jobTools: readonly ToolDefinition[]; + jobStore: RunStore; jobTtlMs?: number; jobCheckpointTtlMs?: number; releaseInflight: () => void; @@ -418,13 +507,17 @@ export function createJobDispatchContext(args: { const runner = createRunner({ ttlMs: args.jobTtlMs ?? 30 * 60_000, checkpointTtlMs: args.jobCheckpointTtlMs ?? 60 * 60_000, + ...(args.jobStore !== undefined ? { store: args.jobStore } : {}), }); return { runner, recorder: new JobEventRecorder(), + store: args.jobStore, workflow: args.workflow, tool: args.tool, jobTools: args.jobTools, + jobTtlMs: args.jobTtlMs ?? 30 * 60_000, + jobCheckpointTtlMs: args.jobCheckpointTtlMs ?? 60 * 60_000, taskCount: Object.keys(args.workflow.tasks).length, releaseInflight: args.releaseInflight, }; diff --git a/packages/mcp-server/src/job-store-sqlite.ts b/packages/mcp-server/src/job-store-sqlite.ts new file mode 100644 index 0000000..f928fc7 --- /dev/null +++ b/packages/mcp-server/src/job-store-sqlite.ts @@ -0,0 +1,8 @@ +import type { JobStore, PersistedJob } from "./job-store.js"; + +export async function createSqliteJobStore(dbPath: string): Promise { + const { SqliteRunStore } = await import( + /* @vite-ignore */ "@ageflow/server-sqlite" + ); + return new SqliteRunStore(dbPath) as unknown as JobStore; +} diff --git a/packages/mcp-server/src/job-store.ts b/packages/mcp-server/src/job-store.ts new file mode 100644 index 0000000..ca6e776 --- /dev/null +++ b/packages/mcp-server/src/job-store.ts @@ -0,0 +1,96 @@ +import type { + CheckpointEvent, + RunHandle, + WorkflowMetrics, +} from "@ageflow/core"; +import type { RunStore } from "@ageflow/server"; +import type { ProgressSnapshot } from "./job-event-recorder.js"; + +export interface PersistedJob extends RunHandle { + readonly pendingCheckpoint?: CheckpointEvent; + readonly result?: { + readonly outputs: Record; + readonly metrics: WorkflowMetrics; + }; + readonly error?: { readonly name: string; readonly message: string }; + readonly progress?: ProgressSnapshot; +} + +export interface JobStore extends RunStore { + get(runId: string): PersistedJob | undefined; + list(): readonly PersistedJob[]; + upsert(job: RunHandle): void; +} + +export class InMemoryJobStore implements JobStore { + private readonly jobs = new Map(); + + get(runId: string): PersistedJob | undefined { + const job = this.jobs.get(runId); + return job ? structuredClone(job) : undefined; + } + + list(): readonly PersistedJob[] { + return [...this.jobs.values()].map((job) => structuredClone(job)); + } + + upsert(job: RunHandle): void { + this.jobs.set(job.runId, structuredClone(job)); + } + + delete(runId: string): void { + this.jobs.delete(runId); + } + + close(): void { + this.jobs.clear(); + } +} + +export function isExpired( + job: Pick, + now: number, + ttlMs: number, + checkpointTtlMs: number, +): boolean { + if (job.state === "awaiting-checkpoint") { + return now - job.lastEventAt > checkpointTtlMs; + } + if ( + job.state === "done" || + job.state === "failed" || + job.state === "cancelled" + ) { + return now - job.lastEventAt > ttlMs; + } + return false; +} + +export function toPersistedJob( + snapshot: Pick< + RunHandle, + | "runId" + | "workflowName" + | "state" + | "createdAt" + | "lastEventAt" + | "pendingCheckpoint" + | "result" + | "error" + >, + progress?: ProgressSnapshot, +): PersistedJob { + return { + runId: snapshot.runId, + workflowName: snapshot.workflowName, + state: snapshot.state, + createdAt: snapshot.createdAt, + lastEventAt: snapshot.lastEventAt, + ...(snapshot.pendingCheckpoint !== undefined + ? { pendingCheckpoint: snapshot.pendingCheckpoint } + : {}), + ...(snapshot.result !== undefined ? { result: snapshot.result } : {}), + ...(snapshot.error !== undefined ? { error: snapshot.error } : {}), + ...(progress !== undefined ? { progress } : {}), + }; +} diff --git a/packages/mcp-server/src/programmatic.ts b/packages/mcp-server/src/programmatic.ts index af64e96..b10fb89 100644 --- a/packages/mcp-server/src/programmatic.ts +++ b/packages/mcp-server/src/programmatic.ts @@ -14,6 +14,7 @@ */ import type { WorkflowDef } from "@ageflow/core"; +import type { RunStore } from "@ageflow/server"; import type { Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; import { type HttpTransportHandle, @@ -153,6 +154,12 @@ export interface McpServerConfig { */ ceilings?: CliCeilings; + /** + * Optional async job snapshot store shared across workflow handles. + * When omitted, each workflow handle gets its own in-memory store. + */ + jobStore?: RunStore; + /** * MCP server name advertised during initialization. Defaults to the first * workflow's name (or "ageflow-mcp" for multi-workflow setups). @@ -436,6 +443,7 @@ export function createMcpServer(config: McpServerConfig): McpHandle { const handle = createSingleWorkflowServer({ workflow: patchedWorkflow, cliCeilings: config.ceilings ?? {}, + ...(config.jobStore !== undefined ? { jobStore: config.jobStore } : {}), // When onHitl is set it is baked into patchedWorkflow.hooks.onCheckpoint. // buildMcpHooks in hitl-bridge.ts calls that hook first: if it returns true // the checkpoint is approved; if it returns false the bridge falls through diff --git a/packages/mcp-server/src/server.ts b/packages/mcp-server/src/server.ts index 139d7a0..212bbd6 100644 --- a/packages/mcp-server/src/server.ts +++ b/packages/mcp-server/src/server.ts @@ -1,6 +1,7 @@ import type { WorkflowDef, WorkflowHooks } from "@ageflow/core"; import { resolveMcpConfig } from "@ageflow/core"; import { WorkflowExecutor } from "@ageflow/executor"; +import { InMemoryRunStore, type RunStore } from "@ageflow/server"; import { composeCeilings } from "./ceiling-resolver.js"; import { ErrorCode, @@ -54,6 +55,12 @@ export interface McpServerOptions { readonly jobTtlMs?: number; /** Override the default 1-hour checkpoint TTL (async mode only). */ readonly jobCheckpointTtlMs?: number; + /** Optional async job registry store. Defaults to an in-memory store. */ + readonly runStore?: RunStore; + /** Back-compat alias for `runStore`. */ + readonly jobStore?: RunStore; + /** Optional SQLite job database path. Used when no explicit store is provided. */ + readonly jobDbPath?: string; /** Custom stderr writer (for testing); defaults to process.stderr.write. */ readonly stderr?: (line: string) => void; /** @@ -141,7 +148,9 @@ export function createSingleWorkflowServer( const jobTools = opts.async === true ? buildJobTools(opts.workflow) : []; let inflight = false; - let dispatchCtx: JobDispatchContext | undefined; // lazy — created on first start_* + let dispatchCtx: JobDispatchContext | undefined; + let dispatchCtxPromise: Promise | undefined; + let jobStorePromise: Promise | undefined; const startName = `start_${opts.workflow.name}`; const OBSERVER_TOOLS = new Set([ @@ -151,22 +160,109 @@ export function createSingleWorkflowServer( "resume_workflow", ]); - function ensureDispatchCtx(): JobDispatchContext { - if (!dispatchCtx) { + function resolveJobStore(): Promise { + if (jobStorePromise !== undefined) { + return jobStorePromise; + } + + if (opts.runStore !== undefined) { + jobStorePromise = Promise.resolve(opts.runStore); + return jobStorePromise; + } + + if (opts.jobStore !== undefined) { + jobStorePromise = Promise.resolve(opts.jobStore); + return jobStorePromise; + } + + if (opts.jobDbPath !== undefined) { + jobStorePromise = import(/* @vite-ignore */ "@ageflow/server-sqlite").then( + ({ SqliteRunStore }) => new SqliteRunStore(opts.jobDbPath!), + ); + return jobStorePromise; + } + + jobStorePromise = Promise.resolve(new InMemoryRunStore()); + return jobStorePromise; + } + + function ensureDispatchCtxSync(): JobDispatchContext { + if (dispatchCtx !== undefined) { + return dispatchCtx; + } + + const jobStore = + opts.runStore ?? + opts.jobStore ?? + (opts.jobDbPath !== undefined ? undefined : new InMemoryRunStore()); + + if (jobStore === undefined) { + throw new Error("dispatch context is not ready"); + } + + dispatchCtx = createJobDispatchContext({ + workflow: opts.workflow, + tool, + jobTools, + jobStore, + ...(opts.jobTtlMs !== undefined ? { jobTtlMs: opts.jobTtlMs } : {}), + ...(opts.jobCheckpointTtlMs !== undefined + ? { jobCheckpointTtlMs: opts.jobCheckpointTtlMs } + : {}), + releaseInflight: () => {}, + }); + dispatchCtx.runner.recover?.(opts.workflow); + return dispatchCtx; + } + + function ensureDispatchCtx(): Promise { + if (dispatchCtx !== undefined) { + return Promise.resolve(dispatchCtx); + } + + if (opts.jobDbPath === undefined) { + return Promise.resolve(ensureDispatchCtxSync()); + } + + dispatchCtxPromise ??= resolveJobStore().then((jobStore) => { dispatchCtx = createJobDispatchContext({ workflow: opts.workflow, tool, jobTools, + jobStore, ...(opts.jobTtlMs !== undefined ? { jobTtlMs: opts.jobTtlMs } : {}), ...(opts.jobCheckpointTtlMs !== undefined ? { jobCheckpointTtlMs: opts.jobCheckpointTtlMs } : {}), - releaseInflight: () => { - inflight = false; - }, + releaseInflight: () => {}, }); + dispatchCtx.runner.recover?.(opts.workflow); + return dispatchCtx; + }); + + return dispatchCtxPromise; + } + + if (opts.async === true) { + if (opts.jobDbPath === undefined) { + ensureDispatchCtxSync(); + } else { + void ensureDispatchCtx(); } - return dispatchCtx; + } + + async function hasActiveRun(): Promise { + const ctx = + dispatchCtx !== undefined + ? dispatchCtx + : opts.jobDbPath === undefined + ? ensureDispatchCtxSync() + : await ensureDispatchCtx(); + return ctx.runner.list().some( + (snapshot) => + snapshot.state === "running" || + snapshot.state === "awaiting-checkpoint", + ); } const handle: McpServerHandle = { @@ -176,6 +272,9 @@ export function createSingleWorkflowServer( dispose() { dispatchCtx?.runner.close(); + void resolveJobStore().then((jobStore) => { + jobStore.close(); + }); }, async callTool(name, args, callOpts) { @@ -205,7 +304,7 @@ export function createSingleWorkflowServer( // Observer tools: no inflight lock. if (isObserver) { - const ctx = ensureDispatchCtx(); + const ctx = await ensureDispatchCtx(); switch (name) { case "get_workflow_status": return dispatchGetStatus(args, ctx); @@ -218,92 +317,120 @@ export function createSingleWorkflowServer( } } - // start_* and sync tool: share the inflight lock. - if (inflight) { - return formatErrorResult( - new McpServerError( - ErrorCode.BUSY, - "Another workflow run is in progress", - ), - ); - } - inflight = true; - if (isStart) { - const ctx = ensureDispatchCtx(); - // Thread test hooks into context (test-only). - if (handle._testOnComposedWorkflow !== undefined) { - ctx._testOnComposedWorkflow = handle._testOnComposedWorkflow; + if (inflight) { + return formatErrorResult( + new McpServerError( + ErrorCode.BUSY, + "Another workflow run is in progress", + ), + ); } - // Thread test executor into context if set on handle (wraps 4-arg to 1-arg) - if (handle._testRunExecutor !== undefined) { - const testExec = handle._testRunExecutor; - ctx.testRunExecutor = (input: unknown) => - testExec( - input, - undefined, - new AbortController().signal, - {} as EffectiveCeilings, + inflight = true; + try { + if (await hasActiveRun()) { + return formatErrorResult( + new McpServerError( + ErrorCode.BUSY, + "Another workflow run is in progress", + ), ); - } + } + const ctx = + opts.jobDbPath === undefined + ? ensureDispatchCtxSync() + : await ensureDispatchCtx(); + // Thread test hooks into context (test-only). + if (handle._testOnComposedWorkflow !== undefined) { + ctx._testOnComposedWorkflow = handle._testOnComposedWorkflow; + } + // Thread test executor into context if set on handle (wraps 4-arg to 1-arg) + if (handle._testRunExecutor !== undefined) { + const testExec = handle._testRunExecutor; + ctx.testRunExecutor = (input: unknown) => + testExec( + input, + undefined, + new AbortController().signal, + {} as EffectiveCeilings, + ); + } + + // Apply the same ceiling/hook composition as the sync path so that + // async jobs respect CLI ceiling overrides and the configured HITL strategy. + const asyncEffective = composeCeilings( + resolved, + opts.cliCeilings, + stderr, + ); - // Apply the same ceiling/hook composition as the sync path so that - // async jobs respect CLI ceiling overrides and the configured HITL strategy. - const asyncEffective = composeCeilings( - resolved, - opts.cliCeilings, - stderr, - ); + // DurationWatchdog for async: instead of Promise.race (which requires + // awaiting the run), we wire an AbortController into runner.fire() via + // the `signal` option. When maxDurationSec elapses the signal fires, + // the internal executor stream receives it, and the run is cancelled. + // This is semantically equivalent to the sync watchdog (both abort the + // underlying executor) but adapted for fire-and-forget execution. + // Note: the run will appear as "cancelled" (not "duration-exceeded") in + // the job registry, which is the correct terminal state for async jobs + // that hit their duration ceiling. A finer-grained DURATION_EXCEEDED + // status is left as a follow-up (tracked in #84 item 11). + let asyncWatchdogSignal: AbortSignal | undefined; + if (asyncEffective.maxDurationSec !== null) { + const asyncWatchdog = new DurationWatchdog( + asyncEffective.maxDurationSec, + () => {}, + ); + asyncWatchdog.start(); + asyncWatchdogSignal = asyncWatchdog.abortSignal; + } + + // Derive async HITL strategy: map hitlStrategy to a simple checkpoint + // resolver for runner.fire(). The async path cannot use MCP elicitation + // (no persistent connection during fire-and-forget execution), so: + // "auto" → immediately approve + // "fail" → immediately reject + // "elicit" → undefined (runner's deferred path; client uses resume_workflow) + const asyncOnCheckpoint: DispatchStartOptions["onCheckpoint"] = + opts.hitlStrategy === "auto" + ? () => true + : opts.hitlStrategy === "fail" + ? () => false + : undefined; // elicit → deferred resume_workflow mechanism + + const dispatchOpts: DispatchStartOptions = { + effective: asyncEffective, + ...(asyncOnCheckpoint !== undefined + ? { onCheckpoint: asyncOnCheckpoint } + : {}), + ...(asyncWatchdogSignal !== undefined + ? { abortSignal: asyncWatchdogSignal } + : {}), + }; - // DurationWatchdog for async: instead of Promise.race (which requires - // awaiting the run), we wire an AbortController into runner.fire() via - // the `signal` option. When maxDurationSec elapses the signal fires, - // the internal executor stream receives it, and the run is cancelled. - // This is semantically equivalent to the sync watchdog (both abort the - // underlying executor) but adapted for fire-and-forget execution. - // Note: the run will appear as "cancelled" (not "duration-exceeded") in - // the job registry, which is the correct terminal state for async jobs - // that hit their duration ceiling. A finer-grained DURATION_EXCEEDED - // status is left as a follow-up (tracked in #84 item 11). - let asyncWatchdogSignal: AbortSignal | undefined; - if (asyncEffective.maxDurationSec !== null) { - const asyncWatchdog = new DurationWatchdog( - asyncEffective.maxDurationSec, - () => {}, - ); - asyncWatchdog.start(); - asyncWatchdogSignal = asyncWatchdog.abortSignal; + // dispatchStart releases inflight via ctx.releaseInflight (onComplete/onError) + // or on validation failure. + return await dispatchStart(name, args, ctx, dispatchOpts); + } finally { + inflight = false; } + } - // Derive async HITL strategy: map hitlStrategy to a simple checkpoint - // resolver for runner.fire(). The async path cannot use MCP elicitation - // (no persistent connection during fire-and-forget execution), so: - // "auto" → immediately approve - // "fail" → immediately reject - // "elicit" → undefined (runner's deferred path; client uses resume_workflow) - const asyncOnCheckpoint: DispatchStartOptions["onCheckpoint"] = - opts.hitlStrategy === "auto" - ? () => true - : opts.hitlStrategy === "fail" - ? () => false - : undefined; // elicit → deferred resume_workflow mechanism - - const dispatchOpts: DispatchStartOptions = { - effective: asyncEffective, - ...(asyncOnCheckpoint !== undefined - ? { onCheckpoint: asyncOnCheckpoint } - : {}), - ...(asyncWatchdogSignal !== undefined - ? { abortSignal: asyncWatchdogSignal } - : {}), - }; - - // dispatchStart releases inflight via ctx.releaseInflight (onComplete/onError) - // or on validation failure. - return await dispatchStart(name, args, ctx, dispatchOpts); + // Sync tool: share the inflight lock and hydrated active-run state. + if (isSync) { + const busy = + inflight || (opts.async === true && (await hasActiveRun())); + if (busy) { + return formatErrorResult( + new McpServerError( + ErrorCode.BUSY, + "Another workflow run is in progress", + ), + ); + } } // Sync path (existing body) — inflight cleared in finally. + inflight = true; try { // Validate input const inputTaskDef = (opts.workflow.tasks as Record)[ diff --git a/packages/mcp-server/tsconfig.json b/packages/mcp-server/tsconfig.json index 87a0160..771ac6d 100644 --- a/packages/mcp-server/tsconfig.json +++ b/packages/mcp-server/tsconfig.json @@ -7,13 +7,15 @@ "paths": { "@ageflow/core": ["../core/src/index.ts"], "@ageflow/executor": ["../executor/src/index.ts"], - "@ageflow/server": ["../server/src/index.ts"] + "@ageflow/server": ["../server/src/index.ts"], + "@ageflow/server-sqlite": ["../server-sqlite/src/index.ts"] } }, "references": [ { "path": "../core" }, { "path": "../executor" }, - { "path": "../server" } + { "path": "../server" }, + { "path": "../server-sqlite" } ], "include": ["src/**/*"], "exclude": [ diff --git a/packages/mcp-server/vitest.config.ts b/packages/mcp-server/vitest.config.ts index 65bb30d..688f444 100644 --- a/packages/mcp-server/vitest.config.ts +++ b/packages/mcp-server/vitest.config.ts @@ -1,6 +1,18 @@ import { defineConfig } from "vitest/config"; +import { fileURLToPath } from "node:url"; + +const serverSqliteSource = fileURLToPath( + new URL("../server-sqlite/src/index.ts", import.meta.url), +); export default defineConfig({ + resolve: { + alias: { + // Keep tests on workspace source to avoid dependency prebundle issues + // around Bun builtins (bun:sqlite) in dist artifacts. + "@ageflow/server-sqlite": serverSqliteSource, + }, + }, test: { environment: "node", include: ["src/**/*.test.ts"], diff --git a/packages/server-sqlite/package.json b/packages/server-sqlite/package.json new file mode 100644 index 0000000..f1c08f2 --- /dev/null +++ b/packages/server-sqlite/package.json @@ -0,0 +1,35 @@ +{ + "name": "@ageflow/server-sqlite", + "version": "0.2.0", + "description": "SQLite-backed RunStore implementation for @ageflow/server.", + "homepage": "https://github.com/Neftedollar/ageflow/tree/master/packages/server-sqlite", + "type": "module", + "private": false, + "exports": { + ".": { + "types": "./dist/index.d.ts", + "import": "./dist/index.js" + } + }, + "files": ["dist"], + "scripts": { + "build": "tsc", + "typecheck": "tsc --noEmit", + "test": "bun --bun x vitest run", + "lint": "biome check src/" + }, + "dependencies": { + "@ageflow/server": "^0.6.0" + }, + "devDependencies": { + "@types/bun": "^1.3.12", + "@types/node": "^22.0.0", + "vitest": "^2.1.0" + }, + "repository": { + "type": "git", + "url": "https://github.com/Neftedollar/ageflow.git" + }, + "keywords": ["sqlite", "server", "workflow", "bun", "persistence"], + "license": "MIT" +} diff --git a/packages/server-sqlite/src/__tests__/sqlite-run-store.test.ts b/packages/server-sqlite/src/__tests__/sqlite-run-store.test.ts new file mode 100644 index 0000000..732c0c7 --- /dev/null +++ b/packages/server-sqlite/src/__tests__/sqlite-run-store.test.ts @@ -0,0 +1,106 @@ +import { mkdtempSync, rmSync } from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import type { WorkflowMetrics } from "@ageflow/core"; +import type { PersistedRunRecord } from "@ageflow/server"; +import { afterEach, describe, expect, it } from "vitest"; +import { SqliteRunStore } from "../sqlite-run-store.js"; + +function makeMetrics(): WorkflowMetrics { + return { + totalLatencyMs: 100, + totalTokensIn: 0, + totalTokensOut: 0, + totalEstimatedCost: 0, + taskCount: 1, + }; +} + +function makeSnapshot( + overrides: Partial = {}, +): PersistedRunRecord { + return { + runId: crypto.randomUUID(), + workflowName: "test-workflow", + state: "done", + createdAt: 1_700_000_000_000, + lastEventAt: 1_700_000_000_000, + input: { persisted: true }, + result: { + outputs: { ok: true }, + metrics: makeMetrics(), + }, + ...overrides, + }; +} + +describe("SqliteRunStore", () => { + let tmpDir: string | undefined; + + afterEach(() => { + if (tmpDir !== undefined) { + rmSync(tmpDir, { recursive: true, force: true }); + tmpDir = undefined; + } + }); + + it("round-trips snapshots through sqlite", () => { + tmpDir = mkdtempSync(path.join(os.tmpdir(), "ageflow-run-store-")); + const store = new SqliteRunStore(path.join(tmpDir, "runs.sqlite")); + const snapshot = makeSnapshot(); + + store.upsert(snapshot); + + expect(store.get(snapshot.runId)).toEqual(snapshot); + expect(store.list()).toEqual([snapshot]); + + store.close(); + }); + + it("orders list by last event time and deletes entries", () => { + tmpDir = mkdtempSync(path.join(os.tmpdir(), "ageflow-run-store-")); + const store = new SqliteRunStore(path.join(tmpDir, "runs.sqlite")); + const older = makeSnapshot({ runId: "older", lastEventAt: 1 }); + const newer = makeSnapshot({ runId: "newer", lastEventAt: 2 }); + + store.upsert(older); + store.upsert(newer); + + expect(store.list().map((snapshot) => snapshot.runId)).toEqual([ + "newer", + "older", + ]); + + store.delete(older.runId); + expect(store.get(older.runId)).toBeUndefined(); + + store.close(); + }); + + it("becomes inert after close", () => { + tmpDir = mkdtempSync(path.join(os.tmpdir(), "ageflow-run-store-")); + const store = new SqliteRunStore(path.join(tmpDir, "runs.sqlite")); + const snapshot = makeSnapshot(); + + store.upsert(snapshot); + store.close(); + + expect(store.get(snapshot.runId)).toBeUndefined(); + expect(store.list()).toEqual([]); + }); + + it("reopens the same database with the persisted row intact", () => { + tmpDir = mkdtempSync(path.join(os.tmpdir(), "ageflow-run-store-")); + const dbPath = path.join(tmpDir, "runs.sqlite"); + const snapshot = makeSnapshot(); + + const first = new SqliteRunStore(dbPath); + first.upsert(snapshot); + first.close(); + + const second = new SqliteRunStore(dbPath); + expect(second.get(snapshot.runId)).toEqual(snapshot); + expect(second.list()).toEqual([snapshot]); + second.close(); + }); +}); diff --git a/packages/server-sqlite/src/index.ts b/packages/server-sqlite/src/index.ts new file mode 100644 index 0000000..440e0a8 --- /dev/null +++ b/packages/server-sqlite/src/index.ts @@ -0,0 +1 @@ +export { SqliteRunStore } from "./sqlite-run-store.js"; diff --git a/packages/server-sqlite/src/sqlite-run-store.ts b/packages/server-sqlite/src/sqlite-run-store.ts new file mode 100644 index 0000000..29d8b5f --- /dev/null +++ b/packages/server-sqlite/src/sqlite-run-store.ts @@ -0,0 +1,169 @@ +import { mkdirSync } from "node:fs"; +import { createRequire } from "node:module"; +import path from "node:path"; +import type { PersistedRunRecord, RunStore } from "@ageflow/server"; + +interface SnapshotRow { + readonly payload: string; +} + +interface SqliteStatement { + run(params?: unknown): unknown; + get(params?: unknown): unknown; + all(params?: unknown): unknown; +} + +interface SqliteAdapter { + exec(sql: string): void; + prepare(sql: string): SqliteStatement; + close(): void; +} + +function createSqliteAdapter(dbPath: string): SqliteAdapter { + const require = createRequire(import.meta.url); + + try { + const { Database } = require("bun:sqlite") as typeof import("bun:sqlite"); + const db = new Database(dbPath, { create: true }); + return { + exec(sql: string): void { + db.query(sql).run(); + }, + prepare(sql: string): SqliteStatement { + const stmt = db.query(sql); + return { + run(params?: unknown): unknown { + return stmt.run(params as any); + }, + get(params?: unknown): unknown { + return stmt.get(params as any); + }, + all(params?: unknown): unknown { + return stmt.all(params as any); + }, + }; + }, + close(): void { + db.close(false); + }, + }; + } catch { + const { DatabaseSync } = require("node:sqlite") as typeof import( + "node:sqlite" + ); + const db = new DatabaseSync(dbPath); + return { + exec(sql: string): void { + db.exec(sql); + }, + prepare(sql: string): SqliteStatement { + const stmt = db.prepare(sql); + return { + run(params?: unknown): unknown { + return stmt.run((params ?? {}) as any); + }, + get(params?: unknown): unknown { + return stmt.get((params ?? {}) as any) as unknown; + }, + all(params?: unknown): unknown { + return stmt.all((params ?? {}) as any) as unknown; + }, + }; + }, + close(): void { + db.close(); + }, + }; + } +} + +export class SqliteRunStore implements RunStore { + private readonly db: SqliteAdapter; + private readonly getStmt: SqliteStatement; + private readonly listStmt: SqliteStatement; + private readonly upsertStmt: SqliteStatement; + private readonly deleteStmt: SqliteStatement; + private closed = false; + + constructor(dbPath: string) { + if (dbPath !== ":memory:" && dbPath !== "") { + mkdirSync(path.dirname(dbPath), { recursive: true }); + } + + this.db = createSqliteAdapter(dbPath); + if (dbPath !== ":memory:" && dbPath !== "") { + this.db.exec("PRAGMA journal_mode = WAL;"); + } + this.db.exec(` + CREATE TABLE IF NOT EXISTS runs ( + runId TEXT PRIMARY KEY, + lastEventAt INTEGER NOT NULL, + payload TEXT NOT NULL + ) + `); + this.db.exec(` + CREATE INDEX IF NOT EXISTS idx_runs_lastEventAt + ON runs(lastEventAt) + `); + + this.getStmt = this.db.prepare( + "SELECT payload FROM runs WHERE runId = $runId", + ); + this.listStmt = this.db.prepare( + "SELECT payload FROM runs ORDER BY lastEventAt DESC", + ); + this.upsertStmt = this.db.prepare(` + INSERT INTO runs (runId, lastEventAt, payload) + VALUES ($runId, $lastEventAt, $payload) + ON CONFLICT(runId) DO UPDATE SET + lastEventAt = excluded.lastEventAt, + payload = excluded.payload + `); + this.deleteStmt = this.db.prepare("DELETE FROM runs WHERE runId = $runId"); + } + + private ensureOpen(): boolean { + return !this.closed; + } + + private parseSnapshot( + row: SnapshotRow | undefined, + ): PersistedRunRecord | undefined { + if (!row) return undefined; + return JSON.parse(row.payload) as PersistedRunRecord; + } + + get(runId: string): PersistedRunRecord | undefined { + if (!this.ensureOpen()) return undefined; + return this.parseSnapshot( + this.getStmt.get({ $runId: runId }) as SnapshotRow | undefined, + ); + } + + list(): readonly PersistedRunRecord[] { + if (!this.ensureOpen()) return []; + return (this.listStmt.all() as SnapshotRow[]).map( + (row) => JSON.parse(row.payload) as PersistedRunRecord, + ); + } + + upsert(snapshot: PersistedRunRecord): void { + if (!this.ensureOpen()) return; + this.upsertStmt.run({ + $runId: snapshot.runId, + $lastEventAt: snapshot.lastEventAt, + $payload: JSON.stringify(snapshot), + }); + } + + delete(runId: string): void { + if (!this.ensureOpen()) return; + this.deleteStmt.run({ $runId: runId }); + } + + close(): void { + if (this.closed) return; + this.closed = true; + this.db.close(); + } +} diff --git a/packages/server-sqlite/tsconfig.json b/packages/server-sqlite/tsconfig.json new file mode 100644 index 0000000..e651cf2 --- /dev/null +++ b/packages/server-sqlite/tsconfig.json @@ -0,0 +1,22 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "src", + "tsBuildInfoFile": "dist/.tsbuildinfo", + "types": ["bun"], + "paths": { + "@ageflow/server": ["../server/src/index.ts"] + } + }, + "references": [{ "path": "../server" }], + "include": ["src/**/*.ts"], + "exclude": [ + "src/**/*.test.ts", + "src/**/*.test-d.ts", + "src/**/* [0-9]*.*", + "src/**/* [0-9]*/**", + "dist", + "node_modules" + ] +} diff --git a/packages/server/README.md b/packages/server/README.md index 2e4d6f3..ad40515 100644 --- a/packages/server/README.md +++ b/packages/server/README.md @@ -251,8 +251,9 @@ All errors extend `AgentFlowError` from `@ageflow/core`. - **No bundled HTTP middleware.** Turning events into SSE / WebSocket / JSONL is the caller's responsibility. A separate `@ageflow/server-http` package may ship in v0.2. -- **No persistence.** The run registry is in-memory. Pluggable `RunStore` - (SQLite / Redis) is planned for v0.2. +- **In-memory by default; persistence is pluggable.** The run registry + keeps active handles in-process. When a durable `RunStore` is provided, + run snapshots can survive restart and be hydrated by higher layers. - **No distributed runs.** Single-process only. - **No `subscribe(runId)`.** Joining an in-flight run from a second observer is deferred to v0.2. diff --git a/packages/server/package.json b/packages/server/package.json index 07aca28..0ac4d03 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -1,6 +1,6 @@ { "name": "@ageflow/server", - "version": "0.4.5", + "version": "0.6.0", "description": "Embeddable execution surface for ageflow workflows: stream, fire-and-forget, async HITL, cancellation.", "homepage": "https://github.com/Neftedollar/ageflow/tree/master/packages/server", "type": "module", diff --git a/packages/server/src/__tests__/run-registry.test.ts b/packages/server/src/__tests__/run-registry.test.ts index 4ad27cf..388cfc1 100644 --- a/packages/server/src/__tests__/run-registry.test.ts +++ b/packages/server/src/__tests__/run-registry.test.ts @@ -1,9 +1,44 @@ +import type { RunHandle } from "@ageflow/core"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { RunRegistry } from "../run-registry.js"; +import type { PersistedRunRecord, RunStore } from "../types.js"; beforeEach(() => vi.useFakeTimers()); afterEach(() => vi.useRealTimers()); +class RecordingRunStore implements RunStore { + readonly snapshots = new Map(); + readonly deleted: string[] = []; + + constructor(initial: readonly PersistedRunRecord[] = []) { + for (const snapshot of initial) { + this.snapshots.set(snapshot.runId, structuredClone(snapshot)); + } + } + + get(runId: string): PersistedRunRecord | undefined { + const snapshot = this.snapshots.get(runId); + return snapshot !== undefined ? structuredClone(snapshot) : undefined; + } + + list(): readonly PersistedRunRecord[] { + return [...this.snapshots.values()].map((snapshot) => + structuredClone(snapshot), + ); + } + + upsert(snapshot: PersistedRunRecord): void { + this.snapshots.set(snapshot.runId, structuredClone(snapshot)); + } + + delete(runId: string): void { + this.deleted.push(runId); + this.snapshots.delete(runId); + } + + close(): void {} +} + describe("RunRegistry", () => { it("stores, retrieves, lists", () => { const reg = new RunRegistry({ @@ -28,6 +63,41 @@ describe("RunRegistry", () => { reg.stop(); }); + it("hydrates existing snapshots from the store", () => { + const seededStore = new RecordingRunStore([ + { + runId: "seed", + workflowName: "wf", + state: "done", + createdAt: 100, + lastEventAt: 200, + input: { recovered: true }, + result: { + outputs: { ok: true }, + metrics: { + totalLatencyMs: 1, + totalTokensIn: 2, + totalTokensOut: 3, + totalEstimatedCost: 0.1, + taskCount: 1, + }, + }, + }, + ]); + const reg = new RunRegistry({ + ttlMs: 1000, + checkpointTtlMs: 2000, + reaperIntervalMs: 500, + store: seededStore, + }); + expect(reg.get("seed")).toMatchObject({ + runId: "seed", + state: "done", + }); + expect(reg.list()).toHaveLength(1); + reg.stop(); + }); + it("reaper evicts terminal runs after ttlMs", () => { const reg = new RunRegistry({ ttlMs: 1000, @@ -76,4 +146,36 @@ describe("RunRegistry", () => { expect(reg.get("r1")?.state).toBe("failed"); reg.stop(); }); + + it("deletes expired terminal snapshots from the backing store", () => { + const store = new RecordingRunStore([ + { + runId: "r1", + workflowName: "wf", + state: "done", + createdAt: 0, + lastEventAt: 0, + result: { + outputs: {}, + metrics: { + totalLatencyMs: 0, + totalTokensIn: 0, + totalTokensOut: 0, + totalEstimatedCost: 0, + taskCount: 0, + }, + }, + }, + ]); + const reg = new RunRegistry({ + ttlMs: 1000, + checkpointTtlMs: 2000, + reaperIntervalMs: 100, + store, + }); + vi.advanceTimersByTime(1500); + expect(reg.get("r1")).toBeUndefined(); + expect(store.deleted).toContain("r1"); + reg.stop(); + }); }); diff --git a/packages/server/src/__tests__/runner.fire.test.ts b/packages/server/src/__tests__/runner.fire.test.ts index db40fb7..f5ed440 100644 --- a/packages/server/src/__tests__/runner.fire.test.ts +++ b/packages/server/src/__tests__/runner.fire.test.ts @@ -4,10 +4,11 @@ import { registerRunner, unregisterRunner, } from "@ageflow/core"; -import type { Runner as AgentRunner } from "@ageflow/core"; +import type { Runner as AgentRunner, RunHandle } from "@ageflow/core"; import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { z } from "zod"; import { createRunner } from "../runner.js"; +import type { PersistedRunRecord, RunStore } from "../types.js"; const stub: AgentRunner = { validate: async () => ({ ok: true }), @@ -29,6 +30,33 @@ const wf = defineWorkflow({ tasks: { t: { agent, input: {} } }, }); +class RecordingRunStore implements RunStore { + readonly snapshots = new Map(); + readonly upserts: PersistedRunRecord[] = []; + + get(runId: string): PersistedRunRecord | undefined { + const snapshot = this.snapshots.get(runId); + return snapshot !== undefined ? structuredClone(snapshot) : undefined; + } + + list(): readonly PersistedRunRecord[] { + return [...this.snapshots.values()].map((snapshot) => + structuredClone(snapshot), + ); + } + + upsert(snapshot: PersistedRunRecord): void { + this.upserts.push(structuredClone(snapshot)); + this.snapshots.set(snapshot.runId, structuredClone(snapshot)); + } + + delete(runId: string): void { + this.snapshots.delete(runId); + } + + close(): void {} +} + beforeEach(() => registerRunner("stub2", stub)); afterEach(() => unregisterRunner("stub2")); @@ -111,4 +139,14 @@ describe("fire()", () => { expect(handle.workflowName).toBe("fire-wf"); await runner.close(); }); + + it("persists the terminal snapshot to the configured store", async () => { + const store = new RecordingRunStore(); + const runner = createRunner({ store }); + await runner.run(wf, {}); + const snapshot = [...store.snapshots.values()][0]; + expect(snapshot?.state).toBe("done"); + expect(snapshot?.result).toBeDefined(); + await runner.close(); + }); }); diff --git a/packages/server/src/__tests__/runner.hitl.test.ts b/packages/server/src/__tests__/runner.hitl.test.ts index a88605f..a069c0d 100644 --- a/packages/server/src/__tests__/runner.hitl.test.ts +++ b/packages/server/src/__tests__/runner.hitl.test.ts @@ -4,10 +4,11 @@ import { registerRunner, unregisterRunner, } from "@ageflow/core"; -import type { Runner as AgentRunner } from "@ageflow/core"; +import type { Runner as AgentRunner, RunHandle } from "@ageflow/core"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { z } from "zod"; import { createRunner } from "../runner.js"; +import type { PersistedRunRecord, RunStore } from "../types.js"; const stub: AgentRunner = { validate: async () => ({ ok: true }), @@ -30,6 +31,31 @@ const wf = defineWorkflow({ tasks: { t: { agent, input: {} } }, }); +class RecordingRunStore implements RunStore { + readonly snapshots = new Map(); + + get(runId: string): PersistedRunRecord | undefined { + const snapshot = this.snapshots.get(runId); + return snapshot !== undefined ? structuredClone(snapshot) : undefined; + } + + list(): readonly PersistedRunRecord[] { + return [...this.snapshots.values()].map((snapshot) => + structuredClone(snapshot), + ); + } + + upsert(snapshot: PersistedRunRecord): void { + this.snapshots.set(snapshot.runId, structuredClone(snapshot)); + } + + delete(runId: string): void { + this.snapshots.delete(runId); + } + + close(): void {} +} + beforeEach(() => registerRunner("stub", stub)); afterEach(() => unregisterRunner("stub")); @@ -147,4 +173,26 @@ describe("async HITL", () => { if (id) expect(() => runner.resume(id, true)).toThrow(); await runner.close(); }); + + it("persists awaiting-checkpoint snapshots to the store", async () => { + const store = new RecordingRunStore(); + const runner = createRunner({ store }); + const gen = runner.stream(wf, {}); + let step = await gen.next(); + while (!step.done && step.value.type !== "checkpoint") { + step = await gen.next(); + } + expect(step.done).toBe(false); + const snapshot = [...store.snapshots.values()][0]; + expect(snapshot?.state).toBe("awaiting-checkpoint"); + expect(snapshot?.pendingCheckpoint?.taskName).toBe("t"); + if (!step.done) { + runner.resume(step.value.runId, true); + } + let next = await gen.next(); + while (!next.done) { + next = await gen.next(); + } + await runner.close(); + }); }); diff --git a/packages/server/src/__tests__/runner.persistence.test.ts b/packages/server/src/__tests__/runner.persistence.test.ts new file mode 100644 index 0000000..bea352e --- /dev/null +++ b/packages/server/src/__tests__/runner.persistence.test.ts @@ -0,0 +1,101 @@ +import { + defineAgent, + defineWorkflow, + registerRunner, + unregisterRunner, +} from "@ageflow/core"; +import type { Runner as AgentRunner } from "@ageflow/core"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { z } from "zod"; +import { createRunner } from "../runner.js"; +import type { PersistedRunRecord, RunStore } from "../types.js"; + +const stub: AgentRunner = { + validate: async () => ({ ok: true }), + spawn: async () => ({ + stdout: JSON.stringify({ ok: true }), + sessionHandle: "replay", + tokensIn: 0, + tokensOut: 0, + }), +}; + +const agent = defineAgent({ + runner: "replay-stub", + input: z.object({ q: z.string() }), + output: z.object({ ok: z.boolean() }), + prompt: () => "p", +}); + +const workflow = defineWorkflow({ + name: "replay-workflow", + tasks: { t: { agent, input: { q: "seed" } } }, +}); + +class RecordingRunStore implements RunStore { + readonly snapshots = new Map(); + + get(runId: string): PersistedRunRecord | undefined { + const snapshot = this.snapshots.get(runId); + return snapshot !== undefined ? structuredClone(snapshot) : undefined; + } + + list(): readonly PersistedRunRecord[] { + return [...this.snapshots.values()].map((snapshot) => + structuredClone(snapshot), + ); + } + + upsert(snapshot: PersistedRunRecord): void { + this.snapshots.set(snapshot.runId, structuredClone(snapshot)); + } + + delete(runId: string): void { + this.snapshots.delete(runId); + } + + close(): void {} +} + +beforeEach(() => registerRunner("replay-stub", stub)); +afterEach(() => unregisterRunner("replay-stub")); + +describe("runner recovery", () => { + it("replays a stored running job under the same runId", async () => { + const store = new RecordingRunStore(); + const runId = "replay-run"; + store.upsert({ + runId, + workflowName: workflow.name, + state: "running", + createdAt: 1_700_000_000_000, + lastEventAt: 1_700_000_000_000, + input: { q: "seed" }, + }); + + const runner = createRunner({ store }); + let spawnCalls = 0; + const originalSpawn = stub.spawn; + stub.spawn = async (...args) => { + spawnCalls += 1; + return originalSpawn(...args); + }; + + try { + runner.recover?.(workflow); + + for (let i = 0; i < 50; i += 1) { + if (store.get(runId)?.state === "done") break; + await new Promise((resolve) => setTimeout(resolve, 10)); + } + + expect(spawnCalls).toBeGreaterThan(0); + expect(runner.get(runId)?.runId).toBe(runId); + expect(runner.get(runId)?.state).toBe("done"); + expect(store.get(runId)?.state).toBe("done"); + } finally { + stub.spawn = originalSpawn; + await runner.close(); + } + }); +}); diff --git a/packages/server/src/__tests__/types.test-d.ts b/packages/server/src/__tests__/types.test-d.ts index 278c4b9..0c33195 100644 --- a/packages/server/src/__tests__/types.test-d.ts +++ b/packages/server/src/__tests__/types.test-d.ts @@ -1,7 +1,7 @@ import type { WorkflowDef } from "@ageflow/core"; import { describe, expectTypeOf, it } from "vitest"; import { createRunner } from "../runner.js"; -import type { RunHandle, Runner, WorkflowResult } from "../types.js"; +import type { RunHandle, RunStore, Runner, WorkflowResult } from "../types.js"; // Use a real but minimal tasks map to avoid the {} ban. type EmptyTasks = Record; @@ -43,4 +43,15 @@ describe("types", () => { expectTypeOf(r.get("id")).toEqualTypeOf(); expectTypeOf(r.list()).toMatchTypeOf(); }); + + it("createRunner accepts a RunStore", () => { + const store: RunStore = { + get: () => undefined, + list: () => [], + upsert: () => {}, + delete: () => {}, + close: () => {}, + }; + expectTypeOf(createRunner({ store })).toMatchTypeOf(); + }); }); diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index c2731e7..4bce68b 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -1,12 +1,15 @@ export { createRunner } from "./runner.js"; export type { FireOptions, + PersistedRunRecord, + RunStore, Runner, RunnerConfig, RunOptions, RunHandle, WorkflowResult, } from "./types.js"; +export { InMemoryRunStore } from "./run-store.js"; export { CheckpointTimeoutError, HitlRejectedError, diff --git a/packages/server/src/run-handle.ts b/packages/server/src/run-handle.ts index a2bd402..b8d2ad0 100644 --- a/packages/server/src/run-handle.ts +++ b/packages/server/src/run-handle.ts @@ -5,15 +5,20 @@ import type { WorkflowMetrics, } from "@ageflow/core"; import { CheckpointTimeoutError } from "./errors.js"; +import type { PersistedRunRecord } from "./run-store.js"; export interface CreateHandleArgs { readonly runId: string; readonly workflowName: string; + readonly input?: unknown; + readonly snapshot?: PersistedRunRecord; + readonly persist?: (snapshot: PersistedRunRecord) => void; } export interface PendingCheckpoint { readonly event: CheckpointEvent; readonly resolve: (approved: boolean) => void; + readonly recoveredFromStore?: boolean; } export class InternalRunHandle { @@ -21,6 +26,9 @@ export class InternalRunHandle { readonly workflowName: string; readonly createdAt: number; readonly abort: AbortController; + readonly recoveredFromStore: boolean; + readonly input?: unknown; + private readonly persist: ((snapshot: PersistedRunRecord) => void) | undefined; state: RunState = "running"; lastEventAt: number; @@ -31,15 +39,45 @@ export class InternalRunHandle { constructor(args: CreateHandleArgs) { this.runId = args.runId; this.workflowName = args.workflowName; - this.createdAt = Date.now(); - this.lastEventAt = this.createdAt; + this.recoveredFromStore = args.snapshot !== undefined; + this.persist = args.persist; + this.createdAt = args.snapshot?.createdAt ?? Date.now(); + this.lastEventAt = args.snapshot?.lastEventAt ?? this.createdAt; this.abort = new AbortController(); + this.input = args.input ?? args.snapshot?.input; + if (args.snapshot !== undefined) { + this.state = args.snapshot.state; + if (args.snapshot.pendingCheckpoint !== undefined) { + this.pendingCheckpoint = { + event: args.snapshot.pendingCheckpoint, + resolve: () => {}, + recoveredFromStore: true, + }; + } + if (args.snapshot.result !== undefined) { + this.result = args.snapshot.result; + } + if (args.snapshot.error !== undefined) { + this.error = new Error(args.snapshot.error.message); + this.error.name = args.snapshot.error.name; + } + } } touch(): void { this.lastEventAt = Date.now(); } + private persistSnapshot(): void { + if (!this.persist) return; + const snapshot = this.snapshot(); + const persisted: PersistedRunRecord = { + ...snapshot, + ...(this.input !== undefined ? { input: this.input } : {}), + }; + this.persist(persisted); + } + markAwaitingCheckpoint( event: CheckpointEvent, resolve: (approved: boolean) => void, @@ -47,6 +85,7 @@ export class InternalRunHandle { this.state = "awaiting-checkpoint"; this.pendingCheckpoint = { event, resolve }; this.touch(); + this.persistSnapshot(); } clearCheckpoint(): void { @@ -54,6 +93,7 @@ export class InternalRunHandle { delete this.pendingCheckpoint; this.state = "running"; this.touch(); + this.persistSnapshot(); } markDone(result: { @@ -65,6 +105,7 @@ export class InternalRunHandle { // biome-ignore lint/performance/noDelete: exactOptionalPropertyTypes requires delete over undefined assignment delete this.pendingCheckpoint; this.touch(); + this.persistSnapshot(); } markFailed(err: Error): void { @@ -73,6 +114,7 @@ export class InternalRunHandle { // biome-ignore lint/performance/noDelete: exactOptionalPropertyTypes requires delete over undefined assignment delete this.pendingCheckpoint; this.touch(); + this.persistSnapshot(); } markCancelled(): void { @@ -80,6 +122,7 @@ export class InternalRunHandle { // biome-ignore lint/performance/noDelete: exactOptionalPropertyTypes requires delete over undefined assignment delete this.pendingCheckpoint; this.touch(); + this.persistSnapshot(); } snapshot(): RunHandle { @@ -103,9 +146,20 @@ export class InternalRunHandle { return snap; } + persistedSnapshot(): PersistedRunRecord { + return { + ...this.snapshot(), + ...(this.input !== undefined ? { input: this.input } : {}), + }; + } + autoRejectCheckpoint(): void { const pc = this.pendingCheckpoint; if (!pc) return; + if (pc.recoveredFromStore) { + this.markFailed(new CheckpointTimeoutError(pc.event.taskName)); + return; + } pc.resolve(false); this.markFailed(new CheckpointTimeoutError(pc.event.taskName)); } diff --git a/packages/server/src/run-registry.ts b/packages/server/src/run-registry.ts index bbb817c..55c3128 100644 --- a/packages/server/src/run-registry.ts +++ b/packages/server/src/run-registry.ts @@ -1,19 +1,33 @@ import type { RunHandle } from "@ageflow/core"; import { type CreateHandleArgs, InternalRunHandle } from "./run-handle.js"; +import { InMemoryRunStore, type RunStore } from "./run-store.js"; export interface RunRegistryConfig { readonly ttlMs: number; readonly checkpointTtlMs: number; readonly reaperIntervalMs: number; + readonly store?: RunStore; } export class RunRegistry { private readonly handles = new Map(); private readonly cfg: RunRegistryConfig; + private readonly store: RunStore; private readonly timer: ReturnType; constructor(cfg: RunRegistryConfig) { this.cfg = cfg; + this.store = cfg.store ?? new InMemoryRunStore(); + for (const snapshot of this.store.list()) { + const handle = new InternalRunHandle({ + runId: snapshot.runId, + workflowName: snapshot.workflowName, + snapshot, + input: snapshot.input, + persist: (next) => this.store.upsert(next), + }); + this.handles.set(handle.runId, handle); + } this.timer = setInterval(() => this.sweep(), cfg.reaperIntervalMs); if (typeof (this.timer as { unref?: () => void }).unref === "function") { (this.timer as { unref: () => void }).unref(); @@ -21,8 +35,12 @@ export class RunRegistry { } create(args: CreateHandleArgs): InternalRunHandle { - const h = new InternalRunHandle(args); + const h = new InternalRunHandle({ + ...args, + persist: (snapshot) => this.store.upsert(snapshot), + }); this.handles.set(h.runId, h); + this.store.upsert(h.persistedSnapshot()); return h; } @@ -42,6 +60,12 @@ export class RunRegistry { clearInterval(this.timer); } + close(): void { + this.stop(); + this.store.close(); + this.handles.clear(); + } + private sweep(): void { const now = Date.now(); for (const [id, h] of this.handles) { @@ -56,6 +80,7 @@ export class RunRegistry { h.state === "done" || h.state === "failed" || h.state === "cancelled"; if (terminal && now - h.lastEventAt > this.cfg.ttlMs) { this.handles.delete(id); + this.store.delete(id); } } } diff --git a/packages/server/src/run-store.ts b/packages/server/src/run-store.ts new file mode 100644 index 0000000..3061245 --- /dev/null +++ b/packages/server/src/run-store.ts @@ -0,0 +1,38 @@ +import type { RunHandle } from "@ageflow/core"; + +export interface PersistedRunRecord extends RunHandle { + readonly input?: unknown; +} + +export interface RunStore { + get(runId: string): PersistedRunRecord | undefined; + list(): readonly PersistedRunRecord[]; + upsert(snapshot: PersistedRunRecord): void; + delete(runId: string): void; + close(): void; +} + +export class InMemoryRunStore implements RunStore { + private readonly runs = new Map(); + + get(runId: string): PersistedRunRecord | undefined { + const snapshot = this.runs.get(runId); + return snapshot !== undefined ? structuredClone(snapshot) : undefined; + } + + list(): readonly PersistedRunRecord[] { + return [...this.runs.values()].map((snapshot) => structuredClone(snapshot)); + } + + upsert(snapshot: PersistedRunRecord): void { + this.runs.set(snapshot.runId, structuredClone(snapshot)); + } + + delete(runId: string): void { + this.runs.delete(runId); + } + + close(): void { + this.runs.clear(); + } +} diff --git a/packages/server/src/runner.ts b/packages/server/src/runner.ts index 63b6dc0..0784763 100644 --- a/packages/server/src/runner.ts +++ b/packages/server/src/runner.ts @@ -8,8 +8,9 @@ import type { import { shutdownAllRunners } from "@ageflow/core"; import { WorkflowExecutor, type WorkflowResult } from "@ageflow/executor"; import { InvalidRunStateError, RunNotFoundError } from "./errors.js"; -import { createDeferred } from "./run-handle.js"; +import { createDeferred, type InternalRunHandle } from "./run-handle.js"; import { RunRegistry } from "./run-registry.js"; +import { InMemoryRunStore } from "./run-store.js"; import type { FireOptions, RunOptions, Runner, RunnerConfig } from "./types.js"; const DEFAULT_TTL_MS = 5 * 60_000; @@ -17,21 +18,26 @@ const DEFAULT_CHECKPOINT_TTL_MS = 60 * 60_000; const DEFAULT_REAPER_INTERVAL_MS = 60_000; export function createRunner(config: RunnerConfig = {}): Runner { + const store = config.store ?? new InMemoryRunStore(); const registry = new RunRegistry({ ttlMs: config.ttlMs ?? DEFAULT_TTL_MS, checkpointTtlMs: config.checkpointTtlMs ?? DEFAULT_CHECKPOINT_TTL_MS, reaperIntervalMs: config.reaperIntervalMs ?? DEFAULT_REAPER_INTERVAL_MS, + store, }); const generateRunId = config.generateRunId ?? (() => crypto.randomUUID()); + const replayedRunIds = new Set(); async function* streamImpl( workflow: WorkflowDef, input: unknown, options: RunOptions | undefined, preAllocatedRunId?: string, + existingHandle?: InternalRunHandle, ): AsyncGenerator, void> { const runId = preAllocatedRunId ?? generateRunId(); - const handle = registry.create({ runId, workflowName: workflow.name }); + const handle = + existingHandle ?? registry.create({ runId, workflowName: workflow.name, input }); // Combine caller signal + internal abort. if (options?.signal) { @@ -179,6 +185,9 @@ export function createRunner(config: RunnerConfig = {}): Runner { if (h.state !== "awaiting-checkpoint" || !h.pendingCheckpoint) { throw new InvalidRunStateError(runId, h.state); } + if (h.pendingCheckpoint.recoveredFromStore === true) { + throw new InvalidRunStateError(runId, h.state); + } const { resolve } = h.pendingCheckpoint; h.clearCheckpoint(); resolve(approved); @@ -206,9 +215,49 @@ export function createRunner(config: RunnerConfig = {}): Runner { get: (runId) => registry.get(runId), list: () => registry.list(), + recover(workflow: WorkflowDef): void { + for (const record of store.list()) { + if (record.workflowName !== workflow.name) continue; + if ( + record.state !== "running" && + record.state !== "awaiting-checkpoint" + ) { + continue; + } + if (replayedRunIds.has(record.runId)) continue; + const handle = registry.getInternal(record.runId); + if (!handle) continue; + replayedRunIds.add(record.runId); + void replayRecoveredRun(workflow, handle, record.input).catch( + (err: unknown) => { + const error = + err instanceof Error ? err : new Error(String(err)); + handle.markFailed(error); + }, + ); + } + }, async close(): Promise { - registry.stop(); + registry.close(); await shutdownAllRunners(); }, }; + + async function replayRecoveredRun( + workflow: WorkflowDef, + handle: InternalRunHandle, + input: unknown, + ): Promise { + // Rehydrate the run under the same runId by replaying from the stored input. + // The handle is already in the registry with the recovered snapshot state. + const gen = streamImpl(workflow, input, undefined, handle.runId, handle); + try { + let step: IteratorResult>; + do { + step = await gen.next(); + } while (!step.done); + } catch { + // The outer catch in the caller marks the handle failed and persists it. + } + } } diff --git a/packages/server/src/types.ts b/packages/server/src/types.ts index 11e3148..ad5f3cb 100644 --- a/packages/server/src/types.ts +++ b/packages/server/src/types.ts @@ -6,7 +6,9 @@ import type { WorkflowEvent, } from "@ageflow/core"; import type { WorkflowResult } from "@ageflow/executor"; +import type { RunStore } from "./run-store.js"; +export type { PersistedRunRecord, RunStore } from "./run-store.js"; export type { RunHandle } from "@ageflow/core"; export type { WorkflowResult } from "@ageflow/executor"; @@ -30,6 +32,8 @@ export interface RunnerConfig { readonly reaperIntervalMs?: number; /** runId generator. Default: crypto.randomUUID. */ readonly generateRunId?: () => string; + /** Optional run snapshot store. Defaults to an in-memory store. */ + readonly store?: RunStore; } export interface Runner { @@ -55,6 +59,8 @@ export interface Runner { cancel(runId: string): void; get(runId: string): RunHandle | undefined; list(): readonly RunHandle[]; + /** Internal recovery hook used by async job hydration. */ + recover?(workflow: WorkflowDef): void; /** Stop the reaper and shut down all registered runners. Process-level teardown. */ close(): Promise; }