diff --git a/bun.lock b/bun.lock index 589474a..5c4714e 100644 --- a/bun.lock +++ b/bun.lock @@ -104,7 +104,7 @@ }, "packages/cli": { "name": "@ageflow/cli", - "version": "0.6.0", + "version": "0.7.0", "bin": { "agentwf": "./dist/bin.js", }, @@ -206,7 +206,7 @@ }, "packages/mcp-server": { "name": "@ageflow/mcp-server", - "version": "0.7.0", + "version": "0.8.0", "dependencies": { "@ageflow/core": "^0.6.0", "@ageflow/executor": "^0.7.0", diff --git a/packages/cli/package.json b/packages/cli/package.json index ec98ca9..d4b0ae9 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "@ageflow/cli", - "version": "0.6.0", + "version": "0.7.0", "description": "CLI for ageflow \u2014 agentwf run / validate / dry-run / init", "homepage": "https://github.com/Neftedollar/ageflow/tree/master/packages/cli", "type": "module", @@ -26,7 +26,7 @@ "@ageflow/executor": "^0.7.0", "@ageflow/learning": "^0.5.0", "@ageflow/learning-sqlite": "^0.4.1", - "@ageflow/mcp-server": "^0.7.0", + "@ageflow/mcp-server": "^0.8.0", "@ageflow/server": "^0.6.0", "@ageflow/server-sqlite": "^0.2.0", "chalk": "^5.3.0", diff --git a/packages/cli/src/__tests__/mcp-serve.test.ts b/packages/cli/src/__tests__/mcp-serve.test.ts index 52b5486..80a6da2 100644 --- a/packages/cli/src/__tests__/mcp-serve.test.ts +++ b/packages/cli/src/__tests__/mcp-serve.test.ts @@ -121,6 +121,31 @@ describe("parseMcpServeArgs", () => { expect(args.maxTurns).toBeNull(); }); + it("parses --max-concurrent-jobs", () => { + const args = parseMcpServeArgs([ + "wf.ts", + "--async", + "--max-concurrent-jobs", + "3", + ]); + expect(args.maxConcurrentJobs).toBe(3); + }); + + it("defaults maxConcurrentJobs to undefined", () => { + const args = parseMcpServeArgs(["wf.ts"]); + expect(args.maxConcurrentJobs).toBeUndefined(); + }); + + it("parses --max-concurrent-jobs-per-workflow", () => { + const args = parseMcpServeArgs([ + "wf.ts", + "--async", + "--max-concurrent-jobs-per-workflow", + "4", + ]); + expect(args.maxConcurrentJobsPerWorkflow).toBe(4); + }); + it("parses --hitl auto", () => { const args = parseMcpServeArgs(["wf.ts", "--hitl", "auto"]); expect(args.hitlStrategy).toBe("auto"); @@ -178,6 +203,38 @@ describe("parseMcpServeArgs", () => { ); }); + it("throws on invalid --max-concurrent-jobs value", () => { + expect(() => + parseMcpServeArgs(["wf.ts", "--max-concurrent-jobs", "0"]), + ).toThrow(/positive integer/); + }); + + it("throws on negative --max-concurrent-jobs value", () => { + expect(() => + parseMcpServeArgs(["wf.ts", "--max-concurrent-jobs", "-2"]), + ).toThrow(/positive integer/); + }); + + it("throws on invalid --max-concurrent-jobs-per-workflow value", () => { + expect(() => + parseMcpServeArgs([ + "wf.ts", + "--async", + "--max-concurrent-jobs-per-workflow", + "1.5", + ]), + ).toThrow(/positive integer/); + }); + + it("requires --async for concurrency flags", () => { + expect(() => + parseMcpServeArgs(["wf.ts", "--max-concurrent-jobs", "2"]), + ).toThrow(/requires --async/); + expect(() => + parseMcpServeArgs(["wf.ts", "--max-concurrent-jobs-per-workflow", "2"]), + ).toThrow(/requires --async/); + }); + it("throws on unknown flag", () => { expect(() => parseMcpServeArgs(["wf.ts", "--unknown-flag"])).toThrow( /Unknown flag/, diff --git a/packages/cli/src/commands/mcp-serve.ts b/packages/cli/src/commands/mcp-serve.ts index 04389f2..fde8241 100644 --- a/packages/cli/src/commands/mcp-serve.ts +++ b/packages/cli/src/commands/mcp-serve.ts @@ -10,6 +10,8 @@ * --no-max-duration disable duration ceiling * --max-turns maximum agent turns * --no-max-turns disable turns ceiling + * --max-concurrent-jobs max concurrent running jobs + * --max-concurrent-jobs-per-workflow max concurrent running jobs for this workflow * --hitl HITL strategy: elicit | auto | fail (default: elicit) * --name MCP server name (default: workflow name) * --log-file write stderr log to a file @@ -24,7 +26,11 @@ import fs from "node:fs"; import path from "node:path"; import type { WorkflowDef } from "@ageflow/core"; -import type { CliCeilings, HitlStrategy } from "@ageflow/mcp-server"; +import type { + CliCeilings, + ConcurrencyConfig, + HitlStrategy, +} from "@ageflow/mcp-server"; import { createHttpTransport, createSingleWorkflowServer, @@ -39,6 +45,8 @@ export interface McpServeArgs { readonly maxCostUsd?: number | null; readonly maxDurationSec?: number | null; readonly maxTurns?: number | null; + readonly maxConcurrentJobs?: number; + readonly maxConcurrentJobsPerWorkflow?: number; readonly hitlStrategy: HitlStrategy; readonly serverName?: string; readonly logFile?: string; @@ -86,6 +94,8 @@ export function parseMcpServeArgs(argv: readonly string[]): McpServeArgs { let maxCostUsd: number | null | undefined = undefined; let maxDurationSec: number | null | undefined = undefined; let maxTurns: number | null | undefined = undefined; + let maxConcurrentJobs: number | undefined = undefined; + let maxConcurrentJobsPerWorkflow: number | undefined = undefined; let hitlStrategy: HitlStrategy = "elicit"; let serverName: string | undefined = undefined; let logFile: string | undefined = undefined; @@ -155,6 +165,38 @@ export function parseMcpServeArgs(argv: readonly string[]): McpServeArgs { maxTurns = null; break; + case "--max-concurrent-jobs": { + const val = args[++i]; + if (val === undefined || val.startsWith("--")) { + throw new Error("--max-concurrent-jobs requires a numeric argument"); + } + const n = Number(val); + if (!Number.isInteger(n) || n <= 0) { + throw new Error( + `--max-concurrent-jobs must be a positive integer, got: ${val}`, + ); + } + maxConcurrentJobs = n; + break; + } + + case "--max-concurrent-jobs-per-workflow": { + const val = args[++i]; + if (val === undefined || val.startsWith("--")) { + throw new Error( + "--max-concurrent-jobs-per-workflow requires a numeric argument", + ); + } + const n = Number(val); + if (!Number.isInteger(n) || n <= 0) { + throw new Error( + `--max-concurrent-jobs-per-workflow must be a positive integer, got: ${val}`, + ); + } + maxConcurrentJobsPerWorkflow = n; + break; + } + case "--hitl": { const val = args[++i]; if (val !== "elicit" && val !== "auto" && val !== "fail") { @@ -273,9 +315,13 @@ export function parseMcpServeArgs(argv: readonly string[]): McpServeArgs { asyncMode !== true && (jobTtlMs !== undefined || jobCheckpointTtlMs !== undefined || - jobDb !== undefined) + jobDb !== undefined || + maxConcurrentJobs !== undefined || + maxConcurrentJobsPerWorkflow !== undefined) ) { - throw new Error("--job-ttl / --checkpoint-ttl / --job-db requires --async"); + throw new Error( + "--job-ttl / --checkpoint-ttl / --job-db / --max-concurrent-jobs / --max-concurrent-jobs-per-workflow requires --async", + ); } if (httpPort !== undefined && httpMode !== true) { @@ -311,6 +357,10 @@ export function parseMcpServeArgs(argv: readonly string[]): McpServeArgs { ...(maxCostUsd !== undefined ? { maxCostUsd } : {}), ...(maxDurationSec !== undefined ? { maxDurationSec } : {}), ...(maxTurns !== undefined ? { maxTurns } : {}), + ...(maxConcurrentJobs !== undefined ? { maxConcurrentJobs } : {}), + ...(maxConcurrentJobsPerWorkflow !== undefined + ? { maxConcurrentJobsPerWorkflow } + : {}), ...(serverName !== undefined ? { serverName } : {}), ...(logFile !== undefined ? { logFile } : {}), ...(asyncMode !== undefined ? { async: asyncMode } : {}), @@ -370,6 +420,21 @@ async function runMcpServe(rawArgv: string[]): Promise { : {}), ...(parsed.maxTurns !== undefined ? { maxTurns: parsed.maxTurns } : {}), }; + const concurrency: ConcurrencyConfig | undefined = + parsed.maxConcurrentJobs !== undefined || + parsed.maxConcurrentJobsPerWorkflow !== undefined + ? { + ...(parsed.maxConcurrentJobs !== undefined + ? { maxConcurrentJobs: parsed.maxConcurrentJobs } + : {}), + ...(parsed.maxConcurrentJobsPerWorkflow !== undefined + ? { + maxConcurrentJobsPerWorkflow: + parsed.maxConcurrentJobsPerWorkflow, + } + : {}), + } + : undefined; // Determine server name const serverName = parsed.serverName ?? workflow.name; @@ -390,6 +455,7 @@ async function runMcpServe(rawArgv: string[]): Promise { workflow, cliCeilings, hitlStrategy: parsed.hitlStrategy, + ...(concurrency !== undefined ? { concurrency } : {}), stderr, ...(parsed.async === true ? { async: true } : {}), ...(parsed.jobTtlMs !== undefined ? { jobTtlMs: parsed.jobTtlMs } : {}), @@ -474,6 +540,8 @@ export function registerMcpCommand(program: Command): void { " --no-max-duration disable duration ceiling\n" + " --max-turns max agent turns\n" + " --no-max-turns disable turns ceiling\n" + + " --max-concurrent-jobs max concurrent running jobs\n" + + " --max-concurrent-jobs-per-workflow max concurrent running jobs for this workflow\n" + " --hitl elicit | auto | fail (default: elicit)\n" + " --name MCP server name\n" + " --log-file log stderr to file\n" + diff --git a/packages/mcp-server/package.json b/packages/mcp-server/package.json index 5bb861e..7004c3d 100644 --- a/packages/mcp-server/package.json +++ b/packages/mcp-server/package.json @@ -1,6 +1,6 @@ { "name": "@ageflow/mcp-server", - "version": "0.7.0", + "version": "0.8.0", "description": "Expose ageflow workflows as MCP tools (stdio transport, progress streaming, HITL via elicitation).", "homepage": "https://github.com/Neftedollar/ageflow/tree/master/packages/mcp-server", "type": "module", diff --git a/packages/mcp-server/src/__tests__/errors.test.ts b/packages/mcp-server/src/__tests__/errors.test.ts index 2ca4fba..3365a88 100644 --- a/packages/mcp-server/src/__tests__/errors.test.ts +++ b/packages/mcp-server/src/__tests__/errors.test.ts @@ -65,6 +65,9 @@ describe("async-mode error mapping (#18)", () => { expect(ErrorCode.JOB_CANCELLED).toBe("JOB_CANCELLED"); expect(ErrorCode.INVALID_RUN_STATE).toBe("INVALID_RUN_STATE"); expect(ErrorCode.ASYNC_MODE_DISABLED).toBe("ASYNC_MODE_DISABLED"); + expect(ErrorCode.CONCURRENCY_LIMIT_EXCEEDED).toBe( + "CONCURRENCY_LIMIT_EXCEEDED", + ); }); it("maps RunNotFoundError → JOB_NOT_FOUND", () => { diff --git a/packages/mcp-server/src/__tests__/integration/async-mode.test.ts b/packages/mcp-server/src/__tests__/integration/async-mode.test.ts index e1137af..5b31fa6 100644 --- a/packages/mcp-server/src/__tests__/integration/async-mode.test.ts +++ b/packages/mcp-server/src/__tests__/integration/async-mode.test.ts @@ -259,6 +259,48 @@ describe("async mode: inflight lock (#18)", () => { release(); h.dispose?.(); }); + + it("applies maxConcurrentJobsPerWorkflow in single-workflow mode", async () => { + const h = createMcpServer({ + workflow, + cliCeilings: {}, + hitlStrategy: "auto", + async: true, + concurrency: { + maxConcurrentJobs: 2, + maxConcurrentJobsPerWorkflow: 1, + }, + }); + + let release!: () => void; + h._testRunExecutor = () => + new Promise((res) => { + release = () => res({ a: "ok" }); + }); + + const first = h.callTool("start_ask", { q: "1" }); + for (let i = 0; i < 50 && typeof release !== "function"; i += 1) { + await new Promise((resolve) => setTimeout(resolve, 0)); + } + expect(typeof release).toBe("function"); + + const second = await h.callTool("start_ask", { q: "2" }); + expect(second.isError).toBe(true); + if (second.isError) { + expect(second.structuredContent.errorCode).toBe("BUSY"); + expect(second.structuredContent.context).toMatchObject({ + scope: "workflow", + kind: "start", + workflowName: "ask", + limit: 1, + active: 1, + }); + } + + release(); + await first; + h.dispose?.(); + }); }); describe("async mode: cancel_workflow (#18)", () => { diff --git a/packages/mcp-server/src/__tests__/programmatic-api.test.ts b/packages/mcp-server/src/__tests__/programmatic-api.test.ts index b8ffa14..73b69ea 100644 --- a/packages/mcp-server/src/__tests__/programmatic-api.test.ts +++ b/packages/mcp-server/src/__tests__/programmatic-api.test.ts @@ -242,6 +242,82 @@ describe("createMcpServer() — multi-workflow", () => { }); }); +describe("createMcpServer() — concurrency", () => { + it("shares server-wide start limits and applies per-workflow overrides", async () => { + const handle = createMcpServer({ + workflows: [greetWorkflow, summarizeWorkflow], + async: true, + concurrency: { + maxConcurrentJobs: 2, + maxConcurrentJobsPerWorkflow: 1, + }, + }); + + const releases: Record void> = {}; + handle._routerHandle._testRunExecutor = async (args: unknown) => { + const input = args as Record; + if (typeof input.name === "string") { + return await new Promise((resolve) => { + releases.greet = () => resolve({ greeting: `hello, ${input.name}!` }); + }); + } + if (typeof input.text === "string") { + return await new Promise((resolve) => { + releases.summarize = () => + resolve({ summary: `short: ${input.text}` }); + }); + } + throw new Error("unexpected input"); + }; + + const summarize1 = handle._routerHandle.callTool("start_summarize", { + text: "first", + }); + const greet1 = handle._routerHandle.callTool("start_greet", { + name: "Bob", + }); + + const summarize2 = await handle._routerHandle.callTool("start_summarize", { + text: "second", + }); + expect(summarize2.isError).toBe(true); + if (summarize2.isError) { + expect(summarize2.structuredContent.errorCode).toBe("BUSY"); + expect(summarize2.structuredContent.context).toMatchObject({ + scope: "workflow", + kind: "start", + workflowName: "summarize", + limit: 1, + active: 1, + }); + } + + releases.greet?.(); + releases.summarize?.(); + + await expect(summarize1).resolves.toMatchObject({ isError: false }); + await expect(greet1).resolves.toMatchObject({ isError: false }); + }); + + it("throws on invalid concurrency config values", () => { + expect(() => + createMcpServer({ + workflows: greetWorkflow, + concurrency: { maxConcurrentJobs: 0 }, + }), + ).toThrow(/positive integer/); + }); + + it("throws when perWorkflow references an unknown workflow", () => { + expect(() => + createMcpServer({ + workflows: [greetWorkflow, summarizeWorkflow], + concurrency: { perWorkflow: { missing: 1 } }, + }), + ).toThrow(/unknown workflow name/); + }); +}); + describe("createMcpServer() — middleware", () => { it("middleware is called for each callTool invocation", async () => { const calls: McpMiddlewareRequest[] = []; diff --git a/packages/mcp-server/src/concurrency.ts b/packages/mcp-server/src/concurrency.ts new file mode 100644 index 0000000..91af210 --- /dev/null +++ b/packages/mcp-server/src/concurrency.ts @@ -0,0 +1,282 @@ +import { ErrorCode, McpServerError } from "./errors.js"; +import type { + ConcurrencyConfig, + ResolvedConcurrencyConfig, + WorkflowConcurrencyConfig, +} from "./types.js"; + +export interface ConcurrencyPermit { + release(): void; +} + +export interface ConcurrencyController { + acquireStart(workflowName: string): ConcurrencyPermit | McpServerError; + acquireJob(workflowName: string): ConcurrencyPermit | McpServerError; +} + +const RESOLVED_CONCURRENCY = Symbol("ageflow.resolvedConcurrency"); + +type InternalResolvedConcurrencyConfig = ResolvedConcurrencyConfig & { + readonly [RESOLVED_CONCURRENCY]: true; +}; + +type Kind = "start" | "job"; + +class Counter { + private _active = 0; + + constructor(readonly limit: number | null) {} + + get active(): number { + return this._active; + } + + tryAcquire(): boolean { + if (this.limit !== null && this._active >= this.limit) { + return false; + } + this._active += 1; + return true; + } + + release(): void { + if (this._active > 0) { + this._active -= 1; + } + } +} + +const controllerCache = new WeakMap< + InternalResolvedConcurrencyConfig, + ConcurrencyController +>(); + +export function resolveConcurrencyConfig( + config?: ConcurrencyConfig | ResolvedConcurrencyConfig, +): ResolvedConcurrencyConfig { + if (config !== undefined && isResolvedConcurrencyConfig(config)) { + return config; + } + + const raw = (config ?? {}) as ConcurrencyConfig; + const workflows: Record = {}; + + for (const [workflowName, workflowConfig] of Object.entries( + raw.workflows ?? {}, + )) { + workflows[workflowName] = normalizeWorkflowConfig( + workflowConfig, + `workflows.${workflowName}`, + ); + } + + for (const [workflowName, maxConcurrentStarts] of Object.entries( + (raw.perWorkflow ?? {}) as Readonly>, + )) { + const nextConfig = workflows[workflowName] ?? {}; + workflows[workflowName] = { + ...nextConfig, + maxConcurrentStarts: normalizeLimit( + maxConcurrentStarts, + `perWorkflow.${workflowName}`, + ), + }; + } + + const resolved = { + maxConcurrentStarts: + raw.maxConcurrentStarts !== undefined + ? normalizeLimit(raw.maxConcurrentStarts, "maxConcurrentStarts") + : raw.maxConcurrentJobs !== undefined + ? normalizeLimit(raw.maxConcurrentJobs, "maxConcurrentJobs") + : 1, + maxConcurrentJobs: + raw.maxConcurrentJobs !== undefined + ? normalizeLimit(raw.maxConcurrentJobs, "maxConcurrentJobs") + : null, + workflows, + } as InternalResolvedConcurrencyConfig; + + Object.defineProperty(resolved, RESOLVED_CONCURRENCY, { + value: true, + enumerable: false, + configurable: false, + }); + + return resolved; +} + +export function createConcurrencyController( + config?: ConcurrencyConfig | ResolvedConcurrencyConfig, +): ConcurrencyController { + const resolved = resolveConcurrencyConfig( + config, + ) as InternalResolvedConcurrencyConfig; + const cached = controllerCache.get(resolved); + if (cached !== undefined) { + return cached; + } + + const serverCounters = { + start: new Counter(resolved.maxConcurrentStarts), + job: new Counter(resolved.maxConcurrentJobs), + }; + const workflowCounters = new Map>>(); + + const controller: ConcurrencyController = { + acquireStart(workflowName: string) { + return acquire("start", workflowName); + }, + acquireJob(workflowName: string) { + return acquire("job", workflowName); + }, + }; + + controllerCache.set(resolved, controller); + return controller; + + function acquire( + kind: Kind, + workflowName: string, + ): ConcurrencyPermit | McpServerError { + const workflowConfig = resolved.workflows[workflowName]; + const workflowLimit = workflowConfig?.[limitFieldFor(kind)]; + const workflowCounter = + workflowLimit !== undefined + ? getWorkflowCounter(workflowName, kind, workflowLimit) + : undefined; + + if (workflowCounter !== undefined && !workflowCounter.tryAcquire()) { + return makeLimitError({ + code: ErrorCode.BUSY, + scope: "workflow", + kind, + workflowName, + limit: workflowCounter.limit, + active: workflowCounter.active, + }); + } + + const serverCounter = serverCounters[kind]; + if (!serverCounter.tryAcquire()) { + workflowCounter?.release(); + return makeLimitError({ + code: ErrorCode.BUSY, + scope: "server", + kind, + limit: serverCounter.limit, + active: serverCounter.active, + }); + } + + let released = false; + return { + release() { + if (released) return; + released = true; + serverCounter.release(); + workflowCounter?.release(); + }, + }; + } + + function getWorkflowCounter( + workflowName: string, + kind: Kind, + limit: number | null, + ): Counter { + const existing = workflowCounters.get(workflowName); + const counter = existing?.[kind]; + if (counter !== undefined) { + return counter; + } + + const next = new Counter(limit); + const merged = { ...(existing ?? {}), [kind]: next } as Partial< + Record + >; + workflowCounters.set(workflowName, merged); + return next; + } +} + +function isResolvedConcurrencyConfig( + config: ConcurrencyConfig | ResolvedConcurrencyConfig, +): config is InternalResolvedConcurrencyConfig { + return ( + typeof config === "object" && + config !== null && + RESOLVED_CONCURRENCY in config + ); +} + +function normalizeWorkflowConfig( + config: WorkflowConcurrencyConfig, + path: string, +): WorkflowConcurrencyConfig { + const maxConcurrentStarts = + config.maxConcurrentStarts !== undefined + ? normalizeLimit( + config.maxConcurrentStarts, + `${path}.maxConcurrentStarts`, + ) + : config.maxConcurrentJobsPerWorkflow !== undefined + ? normalizeLimit( + config.maxConcurrentJobsPerWorkflow, + `${path}.maxConcurrentJobsPerWorkflow`, + ) + : undefined; + return { + ...(maxConcurrentStarts !== undefined ? { maxConcurrentStarts } : {}), + ...(config.maxConcurrentJobs !== undefined + ? { + maxConcurrentJobs: normalizeLimit( + config.maxConcurrentJobs, + `${path}.maxConcurrentJobs`, + ), + } + : {}), + }; +} + +function normalizeLimit(value: number | null, path: string): number | null { + if (value === null) { + return null; + } + if (!Number.isInteger(value) || value <= 0) { + throw new Error( + `${path} must be a positive integer${value === null ? " or null" : ""}, got: ${String(value)}`, + ); + } + return value; +} + +function limitFieldFor( + kind: Kind, +): "maxConcurrentStarts" | "maxConcurrentJobs" { + return kind === "start" ? "maxConcurrentStarts" : "maxConcurrentJobs"; +} + +function makeLimitError(args: { + code: typeof ErrorCode.BUSY; + scope: "server" | "workflow"; + kind: Kind; + limit: number | null; + active: number; + workflowName?: string; +}): McpServerError { + const message = + args.scope === "workflow" + ? `Workflow "${args.workflowName ?? "unknown"}" ${args.kind} concurrency limit reached (limit=${args.limit})` + : `Server ${args.kind} concurrency limit reached (limit=${args.limit})`; + + return new McpServerError(args.code, message, { + scope: args.scope, + kind: args.kind, + limit: args.limit, + active: args.active, + ...(args.workflowName !== undefined + ? { workflowName: args.workflowName } + : {}), + }); +} diff --git a/packages/mcp-server/src/errors.ts b/packages/mcp-server/src/errors.ts index 3d1a5aa..ccad7db 100644 --- a/packages/mcp-server/src/errors.ts +++ b/packages/mcp-server/src/errors.ts @@ -19,6 +19,7 @@ export const ErrorCode = { WORKFLOW_NOT_MCP_EXPOSABLE: "WORKFLOW_NOT_MCP_EXPOSABLE", RUNNER_PREFLIGHT_FAILED: "RUNNER_PREFLIGHT_FAILED", BUSY: "BUSY", + CONCURRENCY_LIMIT_EXCEEDED: "CONCURRENCY_LIMIT_EXCEEDED", SERVER_SHUTDOWN: "SERVER_SHUTDOWN", DAG_INVALID: "DAG_INVALID", JOB_NOT_FOUND: "JOB_NOT_FOUND", diff --git a/packages/mcp-server/src/index.ts b/packages/mcp-server/src/index.ts index b828e2d..eaadddd 100644 --- a/packages/mcp-server/src/index.ts +++ b/packages/mcp-server/src/index.ts @@ -37,6 +37,13 @@ export { } from "./job-store.js"; export { startStdioTransport } from "./stdio-transport.js"; export type { StdioTransportOptions } from "./stdio-transport.js"; -export type { CliCeilings, EffectiveCeilings, HitlStrategy } from "./types.js"; +export type { + CliCeilings, + ConcurrencyConfig, + EffectiveCeilings, + HitlStrategy, + ResolvedConcurrencyConfig, + WorkflowConcurrencyConfig, +} from "./types.js"; export { ErrorCode, McpServerError } from "./errors.js"; export { buildJobTools } from "./job-tools.js"; diff --git a/packages/mcp-server/src/job-dispatch.ts b/packages/mcp-server/src/job-dispatch.ts index a12bfb6..2b0ce5b 100644 --- a/packages/mcp-server/src/job-dispatch.ts +++ b/packages/mcp-server/src/job-dispatch.ts @@ -7,6 +7,7 @@ import type { } from "@ageflow/core"; import { registerRunner, unregisterRunner } from "@ageflow/core"; import { type RunStore, type Runner, createRunner } from "@ageflow/server"; +import type { ConcurrencyController } from "./concurrency.js"; import { ErrorCode, McpServerError, formatErrorResult } from "./errors.js"; import { JobEventRecorder } from "./job-event-recorder.js"; import { type PersistedJob, isExpired, toPersistedJob } from "./job-store.js"; @@ -33,8 +34,6 @@ export interface JobDispatchContext { _testOnComposedWorkflow?: (workflow: WorkflowDef) => void; /** Task count in the workflow (for progress.tasksTotal). */ readonly taskCount: number; - /** Called from fire() onComplete/onError to release the inflight lock. */ - readonly releaseInflight: () => void; } function sweepStore(ctx: JobDispatchContext): void { @@ -94,6 +93,8 @@ function loadCurrentSnapshot( export interface DispatchStartOptions { /** Effective ceilings (from composeCeilings) to apply to workflow budget. */ readonly effective: EffectiveCeilings; + /** Shared concurrency controller. */ + readonly concurrency: ConcurrencyController; /** * Checkpoint resolver for HITL strategy: * - "auto": () => true (approve all) @@ -121,14 +122,18 @@ export async function dispatchStart( ctx: JobDispatchContext, runOpts: DispatchStartOptions, ): Promise { + const startPermit = runOpts.concurrency.acquireStart(ctx.workflow.name); + if (startPermit instanceof McpServerError) { + return formatErrorResult(startPermit); + } + // Validate input via the sync tool's input Zod (shared between sync + async) const inputTaskDef = ctx.workflow.tasks[ctx.tool.inputTask] as { agent: { input: import("zod").ZodType }; }; const parsed = inputTaskDef.agent.input.safeParse(args); if (!parsed.success) { - // Release inflight lock on validation failure (caller already set it) - ctx.releaseInflight(); + startPermit.release(); return formatErrorResult( new McpServerError( ErrorCode.INPUT_VALIDATION_FAILED, @@ -138,6 +143,20 @@ export async function dispatchStart( ); } + const jobPermit = runOpts.concurrency.acquireJob(ctx.workflow.name); + if (jobPermit instanceof McpServerError) { + startPermit.release(); + return formatErrorResult(jobPermit); + } + + let released = false; + const releasePermits = (): void => { + if (released) return; + released = true; + jobPermit.release(); + startPermit.release(); + }; + // Build the composed workflow — apply ceiling overrides and HITL hooks. // This mirrors the setup done on the sync path (composeCeilings + buildMcpHooks) // so that both paths behave identically with respect to CLI ceilings and HITL. @@ -152,112 +171,83 @@ export async function dispatchStart( // Notify test observers with the composed workflow (test-only hook). ctx._testOnComposedWorkflow?.(composedWorkflow); - // If a test executor is injected, temporarily register a fake runner that - // delegates to it. This ensures runner.fire() still registers the handle in - // the RunRegistry, so observer tools (get_workflow_status, cancel_workflow, - // etc.) can find it via runner.get(runId). - // - // In test mode, dispatchStart awaits job completion so that tests can do - // `await h.callTool("start_ask", ...)` and then immediately call observer - // tools without races. (In production, start_* returns jobId immediately - // and the runner background loop is truly fire-and-forget.) - if (ctx.testRunExecutor !== undefined) { - const testExec = ctx.testRunExecutor; - // Get the runner name used by the input task's agent - const inputAgent = ctx.workflow.tasks[ctx.tool.inputTask] as { - agent: { runner: string; output: import("zod").ZodType }; - }; - const runnerName = inputAgent.agent.runner; + let runnerName: string | undefined; + let jobId = ""; + try { + // If a test executor is injected, temporarily register a fake runner that + // delegates to it. This ensures runner.fire() still registers the handle in + // the RunRegistry, so observer tools can find it via runner.get(runId). + if (ctx.testRunExecutor !== undefined) { + const testExec = ctx.testRunExecutor; + const inputAgent = ctx.workflow.tasks[ctx.tool.inputTask] as { + agent: { runner: string; output: import("zod").ZodType }; + }; + runnerName = inputAgent.agent.runner; - // Register a temporary runner for the duration of this fire() call - const fakeRunner = { - validate: async () => ({ ok: true }), - spawn: async ( - _spawnArgs: RunnerSpawnArgs, - ): Promise => { - const output = await testExec(parsed.data); - return { - stdout: JSON.stringify(output), - sessionHandle: "test", - tokensIn: 0, - tokensOut: 0, - }; - }, - }; - registerRunner(runnerName, fakeRunner); + const fakeRunner = { + validate: async () => ({ ok: true }), + spawn: async ( + _spawnArgs: RunnerSpawnArgs, + ): Promise => { + const output = await testExec(parsed.data); + return { + stdout: JSON.stringify(output), + sessionHandle: "test", + tokensIn: 0, + tokensOut: 0, + }; + }, + }; + registerRunner(runnerName, fakeRunner); + } - let jobId = ""; - const handle = ctx.runner.fire(composedWorkflow, parsed.data, { + const onCheckpoint = runOpts.onCheckpoint; + const fireOptions = { ...(runOpts.abortSignal !== undefined ? { signal: runOpts.abortSignal } : {}), - // Apply HITL strategy: "auto" → approve, "fail" → reject, "elicit" → deferred. - ...(runOpts.onCheckpoint !== undefined + ...(onCheckpoint !== undefined ? { - // biome-ignore lint/style/noNonNullAssertion: guarded by outer !== undefined check - onCheckpoint: (ev) => runOpts.onCheckpoint!(ev), + onCheckpoint: (ev: import("@ageflow/core").CheckpointEvent) => + onCheckpoint(ev), } : {}), onEvent: (ev: WorkflowEvent) => ctx.recorder.record(ev), onComplete: () => { - unregisterRunner(runnerName); - persistSnapshot(ctx, jobId); - ctx.releaseInflight(); + if (runnerName !== undefined) { + unregisterRunner(runnerName); + } + if (jobId !== "") { + persistSnapshot(ctx, jobId); + } + releasePermits(); }, onError: () => { - unregisterRunner(runnerName); - persistSnapshot(ctx, jobId); - ctx.releaseInflight(); + if (runnerName !== undefined) { + unregisterRunner(runnerName); + } + if (jobId !== "") { + persistSnapshot(ctx, jobId); + } + releasePermits(); }, - }); + }; + const handle = ctx.runner.fire(composedWorkflow, parsed.data, fireOptions); jobId = handle.runId; persistSnapshot(ctx, jobId, handle); - // Return the result immediately so the caller has the jobId. - // completionPromise is available for tests that need to sync on completion. return { content: [{ type: "text", text: JSON.stringify({ jobId }) }], structuredContent: { jobId }, isError: false, }; + } catch (err) { + if (runnerName !== undefined) { + unregisterRunner(runnerName); + } + releasePermits(); + return formatErrorResult(err); } - - let jobId = ""; - const handle = ctx.runner.fire(composedWorkflow, parsed.data, { - // Wire the DurationWatchdog abort signal (if any) so the internal executor - // honours the duration ceiling on the async path. - ...(runOpts.abortSignal !== undefined - ? { signal: runOpts.abortSignal } - : {}), - // Apply HITL strategy from runOpts: - // - "auto" maps to onCheckpoint: () => true - // - "fail" maps to onCheckpoint: () => false - // - "elicit" maps to undefined → runner uses deferred path so - // resume_workflow can resolve checkpoints externally. - ...(runOpts.onCheckpoint !== undefined - ? { - // biome-ignore lint/style/noNonNullAssertion: guarded by outer !== undefined check - onCheckpoint: (ev) => runOpts.onCheckpoint!(ev), - } - : {}), - onEvent: (ev: WorkflowEvent) => ctx.recorder.record(ev), - onComplete: () => { - persistSnapshot(ctx, jobId); - ctx.releaseInflight(); - }, - onError: () => { - persistSnapshot(ctx, jobId); - ctx.releaseInflight(); - }, - }); - - jobId = handle.runId; - persistSnapshot(ctx, jobId, handle); - return { - content: [{ type: "text", text: JSON.stringify({ jobId }) }], - structuredContent: { jobId }, - isError: false, - }; } export function dispatchGetStatus( @@ -502,7 +492,6 @@ export function createJobDispatchContext(args: { jobStore: RunStore; jobTtlMs?: number; jobCheckpointTtlMs?: number; - releaseInflight: () => void; }): JobDispatchContext { const runner = createRunner({ ttlMs: args.jobTtlMs ?? 30 * 60_000, @@ -519,7 +508,6 @@ export function createJobDispatchContext(args: { jobTtlMs: args.jobTtlMs ?? 30 * 60_000, jobCheckpointTtlMs: args.jobCheckpointTtlMs ?? 60 * 60_000, taskCount: Object.keys(args.workflow.tasks).length, - releaseInflight: args.releaseInflight, }; } diff --git a/packages/mcp-server/src/programmatic.ts b/packages/mcp-server/src/programmatic.ts index b10fb89..8c1c9e5 100644 --- a/packages/mcp-server/src/programmatic.ts +++ b/packages/mcp-server/src/programmatic.ts @@ -27,7 +27,7 @@ import { createSingleWorkflowServer, } from "./server.js"; import { startStdioTransport } from "./stdio-transport.js"; -import type { CliCeilings, HitlStrategy } from "./types.js"; +import type { CliCeilings, ConcurrencyConfig, HitlStrategy } from "./types.js"; // ─── Public types ───────────────────────────────────────────────────────────── @@ -160,6 +160,12 @@ export interface McpServerConfig { */ jobStore?: RunStore; + /** Enable async job mode (start + observer tools) for all workflows. */ + async?: boolean; + + /** Concurrency controller configuration shared across workflows. */ + concurrency?: ConcurrencyConfig; + /** * MCP server name advertised during initialization. Defaults to the first * workflow's name (or "ageflow-mcp" for multi-workflow setups). @@ -399,6 +405,8 @@ export function createMcpServer(config: McpServerConfig): McpHandle { seen.add(name); } + validateConcurrencyWorkflowNames(config.concurrency, seen); + const hitlStrategy = config.hitlStrategy ?? "elicit"; const stderr = config.stderr ?? @@ -444,6 +452,10 @@ export function createMcpServer(config: McpServerConfig): McpHandle { workflow: patchedWorkflow, cliCeilings: config.ceilings ?? {}, ...(config.jobStore !== undefined ? { jobStore: config.jobStore } : {}), + ...(config.async === true ? { async: true } : {}), + ...(config.concurrency !== undefined + ? { concurrency: config.concurrency } + : {}), // When onHitl is set it is baked into patchedWorkflow.hooks.onCheckpoint. // buildMcpHooks in hitl-bridge.ts calls that hook first: if it returns true // the checkpoint is approved; if it returns false the bridge falls through @@ -555,3 +567,28 @@ export function createMcpServer(config: McpServerConfig): McpHandle { return handle; } + +function validateConcurrencyWorkflowNames( + concurrency: ConcurrencyConfig | undefined, + workflowNames: ReadonlySet, +): void { + if (concurrency === undefined) { + return; + } + + for (const workflowName of Object.keys(concurrency.perWorkflow ?? {})) { + if (!workflowNames.has(workflowName)) { + throw new Error( + `createMcpServer: unknown workflow name "${workflowName}" in concurrency.perWorkflow`, + ); + } + } + + for (const workflowName of Object.keys(concurrency.workflows ?? {})) { + if (!workflowNames.has(workflowName)) { + throw new Error( + `createMcpServer: unknown workflow name "${workflowName}" in concurrency.workflows`, + ); + } + } +} diff --git a/packages/mcp-server/src/server.ts b/packages/mcp-server/src/server.ts index 212bbd6..bb10a06 100644 --- a/packages/mcp-server/src/server.ts +++ b/packages/mcp-server/src/server.ts @@ -3,6 +3,7 @@ import { resolveMcpConfig } from "@ageflow/core"; import { WorkflowExecutor } from "@ageflow/executor"; import { InMemoryRunStore, type RunStore } from "@ageflow/server"; import { composeCeilings } from "./ceiling-resolver.js"; +import { createConcurrencyController } from "./concurrency.js"; import { ErrorCode, McpServerError, @@ -34,7 +35,12 @@ import { import { buildJobTools } from "./job-tools.js"; import { ProgressStreamer, type SendProgress } from "./progress-streamer.js"; import { type ToolDefinition, buildToolDefinition } from "./tool-registry.js"; -import type { CliCeilings, EffectiveCeilings, HitlStrategy } from "./types.js"; +import type { + CliCeilings, + ConcurrencyConfig, + EffectiveCeilings, + HitlStrategy, +} from "./types.js"; import { DurationWatchdog } from "./watchdog.js"; export type RunWorkflowFn = (args: { @@ -51,10 +57,14 @@ export interface McpServerOptions { readonly hitlStrategy: HitlStrategy; /** Opt-in async job mode. Default: false. */ readonly async?: boolean; + /** Server-wide async start limit. Default: 1. */ + readonly maxConcurrentStarts?: number; /** Override the default 30-minute job TTL (async mode only). */ readonly jobTtlMs?: number; /** Override the default 1-hour checkpoint TTL (async mode only). */ readonly jobCheckpointTtlMs?: number; + /** Concurrent admission / job limits. */ + readonly concurrency?: ConcurrencyConfig; /** Optional async job registry store. Defaults to an in-memory store. */ readonly runStore?: RunStore; /** Back-compat alias for `runStore`. */ @@ -139,6 +149,9 @@ export function createSingleWorkflowServer( } const resolved = resolveMcpConfig(opts.workflow.mcp); + const concurrency = createConcurrencyController( + buildConcurrencyConfigForSingleWorkflow(opts), + ); const stderr = opts.stderr ?? ((line: string) => { @@ -147,7 +160,7 @@ export function createSingleWorkflowServer( const tool = buildToolDefinition(opts.workflow); const jobTools = opts.async === true ? buildJobTools(opts.workflow) : []; - let inflight = false; + let syncInflight = false; let dispatchCtx: JobDispatchContext | undefined; let dispatchCtxPromise: Promise | undefined; let jobStorePromise: Promise | undefined; @@ -209,7 +222,6 @@ export function createSingleWorkflowServer( ...(opts.jobCheckpointTtlMs !== undefined ? { jobCheckpointTtlMs: opts.jobCheckpointTtlMs } : {}), - releaseInflight: () => {}, }); dispatchCtx.runner.recover?.(opts.workflow); return dispatchCtx; @@ -234,7 +246,6 @@ export function createSingleWorkflowServer( ...(opts.jobCheckpointTtlMs !== undefined ? { jobCheckpointTtlMs: opts.jobCheckpointTtlMs } : {}), - releaseInflight: () => {}, }); dispatchCtx.runner.recover?.(opts.workflow); return dispatchCtx; @@ -251,20 +262,6 @@ export function createSingleWorkflowServer( } } - async function hasActiveRun(): Promise { - const ctx = - dispatchCtx !== undefined - ? dispatchCtx - : opts.jobDbPath === undefined - ? ensureDispatchCtxSync() - : await ensureDispatchCtx(); - return ctx.runner.list().some( - (snapshot) => - snapshot.state === "running" || - snapshot.state === "awaiting-checkpoint", - ); - } - const handle: McpServerHandle = { async listTools() { return opts.async === true ? [tool, ...jobTools] : [tool]; @@ -302,7 +299,7 @@ export function createSingleWorkflowServer( ); } - // Observer tools: no inflight lock. + // Observer tools do not consume start/job permits. if (isObserver) { const ctx = await ensureDispatchCtx(); switch (name) { @@ -318,119 +315,78 @@ export function createSingleWorkflowServer( } if (isStart) { - if (inflight) { - return formatErrorResult( - new McpServerError( - ErrorCode.BUSY, - "Another workflow run is in progress", - ), - ); + const ctx = + opts.jobDbPath === undefined + ? ensureDispatchCtxSync() + : await ensureDispatchCtx(); + // Thread test hooks into context (test-only). + if (handle._testOnComposedWorkflow !== undefined) { + ctx._testOnComposedWorkflow = handle._testOnComposedWorkflow; } - inflight = true; - try { - if (await hasActiveRun()) { - return formatErrorResult( - new McpServerError( - ErrorCode.BUSY, - "Another workflow run is in progress", - ), + // Thread test executor into context if set on handle (wraps 4-arg to 1-arg) + if (handle._testRunExecutor !== undefined) { + const testExec = handle._testRunExecutor; + ctx.testRunExecutor = (input: unknown) => + testExec( + input, + undefined, + new AbortController().signal, + {} as EffectiveCeilings, ); - } - const ctx = - opts.jobDbPath === undefined - ? ensureDispatchCtxSync() - : await ensureDispatchCtx(); - // Thread test hooks into context (test-only). - if (handle._testOnComposedWorkflow !== undefined) { - ctx._testOnComposedWorkflow = handle._testOnComposedWorkflow; - } - // Thread test executor into context if set on handle (wraps 4-arg to 1-arg) - if (handle._testRunExecutor !== undefined) { - const testExec = handle._testRunExecutor; - ctx.testRunExecutor = (input: unknown) => - testExec( - input, - undefined, - new AbortController().signal, - {} as EffectiveCeilings, - ); - } - - // Apply the same ceiling/hook composition as the sync path so that - // async jobs respect CLI ceiling overrides and the configured HITL strategy. - const asyncEffective = composeCeilings( - resolved, - opts.cliCeilings, - stderr, + } + + // Apply the same ceiling/hook composition as the sync path so that + // async jobs respect CLI ceiling overrides and the configured HITL strategy. + const asyncEffective = composeCeilings(resolved, opts.cliCeilings, stderr); + + // DurationWatchdog for async: instead of Promise.race (which requires + // awaiting the run), we wire an AbortController into runner.fire() via + // the `signal` option. When maxDurationSec elapses the signal fires, + // the internal executor stream receives it, and the run is cancelled. + let asyncWatchdogSignal: AbortSignal | undefined; + if (asyncEffective.maxDurationSec !== null) { + const asyncWatchdog = new DurationWatchdog( + asyncEffective.maxDurationSec, + () => {}, ); + asyncWatchdog.start(); + asyncWatchdogSignal = asyncWatchdog.abortSignal; + } - // DurationWatchdog for async: instead of Promise.race (which requires - // awaiting the run), we wire an AbortController into runner.fire() via - // the `signal` option. When maxDurationSec elapses the signal fires, - // the internal executor stream receives it, and the run is cancelled. - // This is semantically equivalent to the sync watchdog (both abort the - // underlying executor) but adapted for fire-and-forget execution. - // Note: the run will appear as "cancelled" (not "duration-exceeded") in - // the job registry, which is the correct terminal state for async jobs - // that hit their duration ceiling. A finer-grained DURATION_EXCEEDED - // status is left as a follow-up (tracked in #84 item 11). - let asyncWatchdogSignal: AbortSignal | undefined; - if (asyncEffective.maxDurationSec !== null) { - const asyncWatchdog = new DurationWatchdog( - asyncEffective.maxDurationSec, - () => {}, - ); - asyncWatchdog.start(); - asyncWatchdogSignal = asyncWatchdog.abortSignal; - } - - // Derive async HITL strategy: map hitlStrategy to a simple checkpoint - // resolver for runner.fire(). The async path cannot use MCP elicitation - // (no persistent connection during fire-and-forget execution), so: - // "auto" → immediately approve - // "fail" → immediately reject - // "elicit" → undefined (runner's deferred path; client uses resume_workflow) - const asyncOnCheckpoint: DispatchStartOptions["onCheckpoint"] = - opts.hitlStrategy === "auto" - ? () => true - : opts.hitlStrategy === "fail" - ? () => false - : undefined; // elicit → deferred resume_workflow mechanism - - const dispatchOpts: DispatchStartOptions = { - effective: asyncEffective, - ...(asyncOnCheckpoint !== undefined - ? { onCheckpoint: asyncOnCheckpoint } - : {}), - ...(asyncWatchdogSignal !== undefined - ? { abortSignal: asyncWatchdogSignal } - : {}), - }; + // Derive async HITL strategy for runner.fire. + const asyncOnCheckpoint: DispatchStartOptions["onCheckpoint"] = + opts.hitlStrategy === "auto" + ? () => true + : opts.hitlStrategy === "fail" + ? () => false + : undefined; // elicit → deferred resume_workflow mechanism + + const dispatchOpts: DispatchStartOptions = { + effective: asyncEffective, + concurrency, + ...(asyncOnCheckpoint !== undefined + ? { onCheckpoint: asyncOnCheckpoint } + : {}), + ...(asyncWatchdogSignal !== undefined + ? { abortSignal: asyncWatchdogSignal } + : {}), + }; - // dispatchStart releases inflight via ctx.releaseInflight (onComplete/onError) - // or on validation failure. - return await dispatchStart(name, args, ctx, dispatchOpts); - } finally { - inflight = false; - } + return await dispatchStart(name, args, ctx, dispatchOpts); } - // Sync tool: share the inflight lock and hydrated active-run state. - if (isSync) { - const busy = - inflight || (opts.async === true && (await hasActiveRun())); - if (busy) { - return formatErrorResult( - new McpServerError( - ErrorCode.BUSY, - "Another workflow run is in progress", - ), - ); - } + // Sync tool executions use a separate re-entry lock so async start + // admission does not block normal tool calls. + if (syncInflight) { + return formatErrorResult( + new McpServerError( + ErrorCode.BUSY, + "Another workflow run is in progress", + ), + ); } - // Sync path (existing body) — inflight cleared in finally. - inflight = true; + syncInflight = true; try { // Validate input const inputTaskDef = (opts.workflow.tasks as Record)[ @@ -556,7 +512,7 @@ export function createSingleWorkflowServer( } catch (err) { return formatErrorResult(err); } finally { - inflight = false; + syncInflight = false; } }, }; @@ -564,6 +520,39 @@ export function createSingleWorkflowServer( return handle; } +function buildConcurrencyConfigForSingleWorkflow( + opts: McpServerOptions, +): ConcurrencyConfig | undefined { + const concurrency = opts.concurrency; + const hasLegacyStartLimit = opts.maxConcurrentStarts !== undefined; + if (concurrency === undefined && !hasLegacyStartLimit) { + return undefined; + } + + const serverLimit = + concurrency?.maxConcurrentJobs ?? + concurrency?.maxConcurrentStarts ?? + concurrency?.maxConcurrentJobsPerWorkflow ?? + opts.maxConcurrentStarts; + return { + ...(concurrency ?? {}), + ...(serverLimit !== undefined + ? { + maxConcurrentStarts: serverLimit, + maxConcurrentJobs: serverLimit, + } + : {}), + ...(concurrency?.maxConcurrentJobsPerWorkflow !== undefined + ? { + perWorkflow: { + ...(concurrency.perWorkflow ?? {}), + [opts.workflow.name]: concurrency.maxConcurrentJobsPerWorkflow, + }, + } + : {}), + }; +} + function safeParse( schema: { safeParse: (v: unknown) => { diff --git a/packages/mcp-server/src/types.ts b/packages/mcp-server/src/types.ts index 6e4d7a6..4b86bca 100644 --- a/packages/mcp-server/src/types.ts +++ b/packages/mcp-server/src/types.ts @@ -10,4 +10,21 @@ export interface EffectiveCeilings { readonly maxTurns: number | null; } +export interface WorkflowConcurrencyConfig { + readonly maxConcurrentStarts?: number | null; + readonly maxConcurrentJobs?: number | null; + readonly maxConcurrentJobsPerWorkflow?: number | null; +} + +export interface ConcurrencyConfig extends WorkflowConcurrencyConfig { + readonly perWorkflow?: Readonly>; + readonly workflows?: Readonly>; +} + +export interface ResolvedConcurrencyConfig { + readonly maxConcurrentStarts: number | null; + readonly maxConcurrentJobs: number | null; + readonly workflows: Readonly>; +} + export type HitlStrategy = "elicit" | "auto" | "fail";