Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@ageflow/cli",
"version": "0.6.0",
"version": "0.7.0",
"description": "CLI for ageflow \u2014 agentwf run / validate / dry-run / init",
"homepage": "https://github.com/Neftedollar/ageflow/tree/master/packages/cli",
"type": "module",
Expand All @@ -26,7 +26,7 @@
"@ageflow/executor": "^0.7.0",
"@ageflow/learning": "^0.5.0",
"@ageflow/learning-sqlite": "^0.4.1",
"@ageflow/mcp-server": "^0.7.0",
"@ageflow/mcp-server": "^0.8.0",
"@ageflow/server": "^0.6.0",
"@ageflow/server-sqlite": "^0.2.0",
"chalk": "^5.3.0",
Expand Down
57 changes: 57 additions & 0 deletions packages/cli/src/__tests__/mcp-serve.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,31 @@ describe("parseMcpServeArgs", () => {
expect(args.maxTurns).toBeNull();
});

it("parses --max-concurrent-jobs", () => {
const args = parseMcpServeArgs([
"wf.ts",
"--async",
"--max-concurrent-jobs",
"3",
]);
expect(args.maxConcurrentJobs).toBe(3);
});

it("defaults maxConcurrentJobs to undefined", () => {
const args = parseMcpServeArgs(["wf.ts"]);
expect(args.maxConcurrentJobs).toBeUndefined();
});

it("parses --max-concurrent-jobs-per-workflow", () => {
const args = parseMcpServeArgs([
"wf.ts",
"--async",
"--max-concurrent-jobs-per-workflow",
"4",
]);
expect(args.maxConcurrentJobsPerWorkflow).toBe(4);
});

it("parses --hitl auto", () => {
const args = parseMcpServeArgs(["wf.ts", "--hitl", "auto"]);
expect(args.hitlStrategy).toBe("auto");
Expand Down Expand Up @@ -178,6 +203,38 @@ describe("parseMcpServeArgs", () => {
);
});

it("throws on invalid --max-concurrent-jobs value", () => {
expect(() =>
parseMcpServeArgs(["wf.ts", "--max-concurrent-jobs", "0"]),
).toThrow(/positive integer/);
});

it("throws on negative --max-concurrent-jobs value", () => {
expect(() =>
parseMcpServeArgs(["wf.ts", "--max-concurrent-jobs", "-2"]),
).toThrow(/positive integer/);
});

it("throws on invalid --max-concurrent-jobs-per-workflow value", () => {
expect(() =>
parseMcpServeArgs([
"wf.ts",
"--async",
"--max-concurrent-jobs-per-workflow",
"1.5",
]),
).toThrow(/positive integer/);
});

it("requires --async for concurrency flags", () => {
expect(() =>
parseMcpServeArgs(["wf.ts", "--max-concurrent-jobs", "2"]),
).toThrow(/requires --async/);
expect(() =>
parseMcpServeArgs(["wf.ts", "--max-concurrent-jobs-per-workflow", "2"]),
).toThrow(/requires --async/);
});

it("throws on unknown flag", () => {
expect(() => parseMcpServeArgs(["wf.ts", "--unknown-flag"])).toThrow(
/Unknown flag/,
Expand Down
74 changes: 71 additions & 3 deletions packages/cli/src/commands/mcp-serve.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
* --no-max-duration disable duration ceiling
* --max-turns <n> maximum agent turns
* --no-max-turns disable turns ceiling
* --max-concurrent-jobs <n> max concurrent running jobs
* --max-concurrent-jobs-per-workflow <n> max concurrent running jobs for this workflow
* --hitl <strategy> HITL strategy: elicit | auto | fail (default: elicit)
* --name <name> MCP server name (default: workflow name)
* --log-file <path> write stderr log to a file
Expand All @@ -24,7 +26,11 @@
import fs from "node:fs";
import path from "node:path";
import type { WorkflowDef } from "@ageflow/core";
import type { CliCeilings, HitlStrategy } from "@ageflow/mcp-server";
import type {
CliCeilings,
ConcurrencyConfig,
HitlStrategy,
} from "@ageflow/mcp-server";
import {
createHttpTransport,
createSingleWorkflowServer,
Expand All @@ -39,6 +45,8 @@ export interface McpServeArgs {
readonly maxCostUsd?: number | null;
readonly maxDurationSec?: number | null;
readonly maxTurns?: number | null;
readonly maxConcurrentJobs?: number;
readonly maxConcurrentJobsPerWorkflow?: number;
readonly hitlStrategy: HitlStrategy;
readonly serverName?: string;
readonly logFile?: string;
Expand Down Expand Up @@ -86,6 +94,8 @@ export function parseMcpServeArgs(argv: readonly string[]): McpServeArgs {
let maxCostUsd: number | null | undefined = undefined;
let maxDurationSec: number | null | undefined = undefined;
let maxTurns: number | null | undefined = undefined;
let maxConcurrentJobs: number | undefined = undefined;
let maxConcurrentJobsPerWorkflow: number | undefined = undefined;
let hitlStrategy: HitlStrategy = "elicit";
let serverName: string | undefined = undefined;
let logFile: string | undefined = undefined;
Expand Down Expand Up @@ -155,6 +165,38 @@ export function parseMcpServeArgs(argv: readonly string[]): McpServeArgs {
maxTurns = null;
break;

case "--max-concurrent-jobs": {
const val = args[++i];
if (val === undefined || val.startsWith("--")) {
throw new Error("--max-concurrent-jobs requires a numeric argument");
}
const n = Number(val);
if (!Number.isInteger(n) || n <= 0) {
throw new Error(
`--max-concurrent-jobs must be a positive integer, got: ${val}`,
);
}
maxConcurrentJobs = n;
break;
}

case "--max-concurrent-jobs-per-workflow": {
const val = args[++i];
if (val === undefined || val.startsWith("--")) {
throw new Error(
"--max-concurrent-jobs-per-workflow requires a numeric argument",
);
}
const n = Number(val);
if (!Number.isInteger(n) || n <= 0) {
throw new Error(
`--max-concurrent-jobs-per-workflow must be a positive integer, got: ${val}`,
);
}
maxConcurrentJobsPerWorkflow = n;
break;
}

case "--hitl": {
const val = args[++i];
if (val !== "elicit" && val !== "auto" && val !== "fail") {
Expand Down Expand Up @@ -273,9 +315,13 @@ export function parseMcpServeArgs(argv: readonly string[]): McpServeArgs {
asyncMode !== true &&
(jobTtlMs !== undefined ||
jobCheckpointTtlMs !== undefined ||
jobDb !== undefined)
jobDb !== undefined ||
maxConcurrentJobs !== undefined ||
maxConcurrentJobsPerWorkflow !== undefined)
) {
throw new Error("--job-ttl / --checkpoint-ttl / --job-db requires --async");
throw new Error(
"--job-ttl / --checkpoint-ttl / --job-db / --max-concurrent-jobs / --max-concurrent-jobs-per-workflow requires --async",
);
}

if (httpPort !== undefined && httpMode !== true) {
Expand Down Expand Up @@ -311,6 +357,10 @@ export function parseMcpServeArgs(argv: readonly string[]): McpServeArgs {
...(maxCostUsd !== undefined ? { maxCostUsd } : {}),
...(maxDurationSec !== undefined ? { maxDurationSec } : {}),
...(maxTurns !== undefined ? { maxTurns } : {}),
...(maxConcurrentJobs !== undefined ? { maxConcurrentJobs } : {}),
...(maxConcurrentJobsPerWorkflow !== undefined
? { maxConcurrentJobsPerWorkflow }
: {}),
...(serverName !== undefined ? { serverName } : {}),
...(logFile !== undefined ? { logFile } : {}),
...(asyncMode !== undefined ? { async: asyncMode } : {}),
Expand Down Expand Up @@ -370,6 +420,21 @@ async function runMcpServe(rawArgv: string[]): Promise<void> {
: {}),
...(parsed.maxTurns !== undefined ? { maxTurns: parsed.maxTurns } : {}),
};
const concurrency: ConcurrencyConfig | undefined =
parsed.maxConcurrentJobs !== undefined ||
parsed.maxConcurrentJobsPerWorkflow !== undefined
? {
...(parsed.maxConcurrentJobs !== undefined
? { maxConcurrentJobs: parsed.maxConcurrentJobs }
: {}),
...(parsed.maxConcurrentJobsPerWorkflow !== undefined
? {
maxConcurrentJobsPerWorkflow:
parsed.maxConcurrentJobsPerWorkflow,
}
: {}),
}
: undefined;

// Determine server name
const serverName = parsed.serverName ?? workflow.name;
Expand All @@ -390,6 +455,7 @@ async function runMcpServe(rawArgv: string[]): Promise<void> {
workflow,
cliCeilings,
hitlStrategy: parsed.hitlStrategy,
...(concurrency !== undefined ? { concurrency } : {}),
stderr,
...(parsed.async === true ? { async: true } : {}),
...(parsed.jobTtlMs !== undefined ? { jobTtlMs: parsed.jobTtlMs } : {}),
Expand Down Expand Up @@ -474,6 +540,8 @@ export function registerMcpCommand(program: Command): void {
" --no-max-duration disable duration ceiling\n" +
" --max-turns <n> max agent turns\n" +
" --no-max-turns disable turns ceiling\n" +
" --max-concurrent-jobs <n> max concurrent running jobs\n" +
" --max-concurrent-jobs-per-workflow <n> max concurrent running jobs for this workflow\n" +
" --hitl <strategy> elicit | auto | fail (default: elicit)\n" +
" --name <name> MCP server name\n" +
" --log-file <path> log stderr to file\n" +
Expand Down
2 changes: 1 addition & 1 deletion packages/mcp-server/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@ageflow/mcp-server",
"version": "0.7.0",
"version": "0.8.0",
"description": "Expose ageflow workflows as MCP tools (stdio transport, progress streaming, HITL via elicitation).",
"homepage": "https://github.com/Neftedollar/ageflow/tree/master/packages/mcp-server",
"type": "module",
Expand Down
3 changes: 3 additions & 0 deletions packages/mcp-server/src/__tests__/errors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ describe("async-mode error mapping (#18)", () => {
expect(ErrorCode.JOB_CANCELLED).toBe("JOB_CANCELLED");
expect(ErrorCode.INVALID_RUN_STATE).toBe("INVALID_RUN_STATE");
expect(ErrorCode.ASYNC_MODE_DISABLED).toBe("ASYNC_MODE_DISABLED");
expect(ErrorCode.CONCURRENCY_LIMIT_EXCEEDED).toBe(
"CONCURRENCY_LIMIT_EXCEEDED",
);
});

it("maps RunNotFoundError → JOB_NOT_FOUND", () => {
Expand Down
42 changes: 42 additions & 0 deletions packages/mcp-server/src/__tests__/integration/async-mode.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,48 @@ describe("async mode: inflight lock (#18)", () => {
release();
h.dispose?.();
});

it("applies maxConcurrentJobsPerWorkflow in single-workflow mode", async () => {
const h = createMcpServer({
workflow,
cliCeilings: {},
hitlStrategy: "auto",
async: true,
concurrency: {
maxConcurrentJobs: 2,
maxConcurrentJobsPerWorkflow: 1,
},
});

let release!: () => void;
h._testRunExecutor = () =>
new Promise((res) => {
release = () => res({ a: "ok" });
});

const first = h.callTool("start_ask", { q: "1" });
for (let i = 0; i < 50 && typeof release !== "function"; i += 1) {
await new Promise<void>((resolve) => setTimeout(resolve, 0));
}
expect(typeof release).toBe("function");

const second = await h.callTool("start_ask", { q: "2" });
expect(second.isError).toBe(true);
if (second.isError) {
expect(second.structuredContent.errorCode).toBe("BUSY");
expect(second.structuredContent.context).toMatchObject({
scope: "workflow",
kind: "start",
workflowName: "ask",
limit: 1,
active: 1,
});
}

release();
await first;
h.dispose?.();
});
});

describe("async mode: cancel_workflow (#18)", () => {
Expand Down
Loading
Loading