From 0a2aa028bfea90144ded03541d3c26123480b0e9 Mon Sep 17 00:00:00 2001 From: Roman Melnikov Date: Sun, 19 Apr 2026 15:43:39 +0300 Subject: [PATCH] feat(dev-workflow): stream live progress for issue #234 --- packages/dev-workflow/README.md | 11 +- .../__tests__/run.progress.test.ts | 240 ++++++++++++++++++ packages/dev-workflow/run.ts | 108 +++++++- 3 files changed, 348 insertions(+), 11 deletions(-) create mode 100644 packages/dev-workflow/__tests__/run.progress.test.ts diff --git a/packages/dev-workflow/README.md b/packages/dev-workflow/README.md index 4a33468..e0bb348 100644 --- a/packages/dev-workflow/README.md +++ b/packages/dev-workflow/README.md @@ -8,14 +8,14 @@ Every code change to the ageflow monorepo goes through this pipeline: PLAN → BUILD → TEST → VERIFY → SHIP — implemented as an ageflow workflow. This is the strategic commitment from issue #194 to use our own tool in anger. -## Status: scaffold — sub-PR 1 of 5 (issue #194) +## Status: in active dogfood rollout (issue #194) | Sub-PR | Status | Contents | |--------|--------|----------| -| 1 (this) | merged | Package scaffold, pipeline stubs, run.ts wiring | +| 1 | merged | Package scaffold, pipeline stubs, run.ts wiring | | 2 | pending | Role library + ageflow-orchestrator role | | 3 | pending | Learning hooks + SQLite store | -| 4 | pending | First real issue run through dogfood | +| 4 | in progress | Real issue runs through dogfood with live stream progress | | 5 | pending | 10-run retrospective + tuning | ## Invoke (once sub-PR 4 is merged) @@ -28,12 +28,13 @@ bun run --filter @ageflow/dev-workflow dev-workflow bun run --filter @ageflow/dev-workflow dev-workflow --dry-run ``` +Live (non-dry) runs stream workflow progress to stdout, including task start/completion, +per-task durations, running spend in USD, and budget-cap progress/warnings. + ## What is NOT implemented yet - **Real LLM tasks** — pipeline stubs use `defineFunction` no-ops. Real role-based agents land in sub-PR 2. -- **Executor dispatch** — `run.ts` loads the issue and logs the plan but does - not call `WorkflowExecutor.stream()`. That wiring lands in sub-PR 4. - **Git worktree creation** — `createWorktree()` logs the would-be command but does not run `git worktree add`. Real call lands in sub-PR 4. - **Learning hooks** — `@ageflow/learning` is declared as a dependency but diff --git a/packages/dev-workflow/__tests__/run.progress.test.ts b/packages/dev-workflow/__tests__/run.progress.test.ts new file mode 100644 index 0000000..35e9158 --- /dev/null +++ b/packages/dev-workflow/__tests__/run.progress.test.ts @@ -0,0 +1,240 @@ +import type { TaskMetrics, WorkflowEvent } from "@ageflow/core"; +import type { WorkflowExecutor } from "@ageflow/executor"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { renderEvent, runWithProgress } from "../run.js"; +import type { WorkflowInput } from "../shared/types.js"; + +vi.mock("../shared/learning.js", () => ({ + initLearning: () => ({ + hooks: {}, + store: { close: () => {} }, + dbPath: "/tmp/learning.sqlite", + }), +})); + +function taskMetrics(estimatedCost: number, latencyMs: number): TaskMetrics { + return { + tokensIn: 0, + tokensOut: 0, + latencyMs, + retries: 0, + estimatedCost, + }; +} + +const baseEvent = { + runId: "run-1", + workflowName: "feature-pipeline", +} as const; + +const fakeInput: WorkflowInput = { + issue: { + number: 234, + title: "progress smoke", + labels: ["feature"], + state: "open", + url: "https://example.com/issues/234", + }, + worktreePath: "/tmp/agents-workflow-wt-234", + specPath: "/tmp/spec.md", + dryRun: false, +}; + +describe("renderEvent", () => { + beforeEach(() => { + vi.restoreAllMocks(); + }); + + it("prints task start and complete progress, including budget progress", () => { + const logSpy = vi.spyOn(console, "log").mockImplementation(() => {}); + + const state = { + starts: new Map(), + spentUsd: 0, + warned80: false, + }; + + const startEv: WorkflowEvent = { + ...baseEvent, + type: "task:start", + taskName: "build", + timestamp: 1_000, + }; + const completeEv: WorkflowEvent = { + ...baseEvent, + type: "task:complete", + taskName: "build", + output: { ok: true }, + metrics: taskMetrics(0.25, 999), + timestamp: 2_500, + }; + + renderEvent(startEv, 1, state); + renderEvent(completeEv, 1, state); + + expect(state.spentUsd).toBeCloseTo(0.25); + const lines = logSpy.mock.calls.map((call) => String(call[0])); + expect(lines.some((line) => line.includes("build started"))).toBe(true); + expect(lines.some((line) => line.includes("build completed in 1.5s"))).toBe( + true, + ); + expect( + lines.some((line) => line.includes("$0.2500 / $1.0000 (25.0%)")), + ).toBe(true); + }); + + it("emits the 80% warning once", () => { + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + vi.spyOn(console, "log").mockImplementation(() => {}); + + const state = { + starts: new Map(), + spentUsd: 0, + warned80: false, + }; + + const completeA: WorkflowEvent = { + ...baseEvent, + type: "task:complete", + taskName: "plan", + output: "ok", + metrics: taskMetrics(0.4, 100), + timestamp: 10, + }; + const completeB: WorkflowEvent = { + ...baseEvent, + type: "task:complete", + taskName: "build", + output: "ok", + metrics: taskMetrics(0.4, 100), + timestamp: 20, + }; + const completeC: WorkflowEvent = { + ...baseEvent, + type: "task:complete", + taskName: "test", + output: "ok", + metrics: taskMetrics(0.4, 100), + timestamp: 30, + }; + + renderEvent(completeA, 1, state); + renderEvent(completeB, 1, state); + renderEvent(completeC, 1, state); + + expect(state.warned80).toBe(true); + expect(warnSpy).toHaveBeenCalledTimes(1); + expect(String(warnSpy.mock.calls[0]?.[0] ?? "")).toContain( + "reached 80.0% of cap", + ); + }); + + it("prints unlimited-budget line when budget cap is not set", () => { + const logSpy = vi.spyOn(console, "log").mockImplementation(() => {}); + + const state = { + starts: new Map(), + spentUsd: 0, + warned80: false, + }; + + const completeEv: WorkflowEvent = { + ...baseEvent, + type: "task:complete", + taskName: "verify", + output: "ok", + metrics: taskMetrics(0.1, 100), + timestamp: 10, + }; + + renderEvent(completeEv, undefined, state); + + const lines = logSpy.mock.calls.map((call) => String(call[0])); + expect(lines.some((line) => line.includes("(no cap)"))).toBe(true); + }); + + it("prints task error with duration when a start timestamp exists", () => { + const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + vi.spyOn(console, "log").mockImplementation(() => {}); + + const state = { + starts: new Map(), + spentUsd: 0, + warned80: false, + }; + + const startEv: WorkflowEvent = { + ...baseEvent, + type: "task:start", + taskName: "ship", + timestamp: 2_000, + }; + const errorEv: WorkflowEvent = { + ...baseEvent, + type: "task:error", + taskName: "ship", + error: { name: "Error", message: "push failed" }, + attempt: 1, + terminal: true, + timestamp: 7_000, + }; + + renderEvent(startEv, 1, state); + renderEvent(errorEv, 1, state); + + expect(String(errorSpy.mock.calls[0]?.[0] ?? "")).toContain( + "ship failed after 5.0s: push failed", + ); + }); +}); + +describe("runWithProgress", () => { + it("drains stream events and returns generator done value", async () => { + vi.spyOn(console, "log").mockImplementation(() => {}); + vi.spyOn(console, "warn").mockImplementation(() => {}); + vi.spyOn(console, "error").mockImplementation(() => {}); + + const events: WorkflowEvent[] = [ + { + ...baseEvent, + type: "task:start", + taskName: "plan", + timestamp: 1, + }, + { + ...baseEvent, + type: "task:complete", + taskName: "plan", + output: { ok: true }, + metrics: taskMetrics(0.05, 100), + timestamp: 101, + }, + { + ...baseEvent, + type: "task:error", + taskName: "build", + error: { name: "Error", message: "lint failed" }, + attempt: 1, + terminal: true, + timestamp: 202, + }, + ]; + + const doneValue = { + outputs: { plan: { ok: true } }, + metrics: { totalEstimatedCost: 0.05 }, + }; + + const fakeExecutor = { + async *stream() { + for (const ev of events) { + yield ev; + } + return doneValue; + }, + } as unknown as WorkflowExecutor>; + + const result = await runWithProgress(fakeExecutor, fakeInput, 5); + expect(result).toEqual(doneValue); + }); +}); diff --git a/packages/dev-workflow/run.ts b/packages/dev-workflow/run.ts index f0b7527..e90df12 100644 --- a/packages/dev-workflow/run.ts +++ b/packages/dev-workflow/run.ts @@ -20,7 +20,7 @@ import { dirname, resolve } from "node:path"; import { fileURLToPath } from "node:url"; import { registerRunner } from "@ageflow/core"; -import type { WorkflowHooks } from "@ageflow/core"; +import type { TasksMap, WorkflowEvent, WorkflowHooks } from "@ageflow/core"; import { BudgetTracker, WorkflowExecutor } from "@ageflow/executor"; import { CodexRunner } from "@ageflow/runner-codex"; import { determinePipeline, loadIssue } from "./shared/issue-loader.js"; @@ -47,6 +47,100 @@ const pipelineFactories = { const __dirname = dirname(fileURLToPath(import.meta.url)); const REPO_ROOT = resolve(__dirname, "../.."); +function formatUsd(usd: number): string { + return `$${usd.toFixed(4)}`; +} + +function formatMs(ms: number): string { + return `${(ms / 1000).toFixed(1)}s`; +} + +export function renderEvent( + ev: WorkflowEvent, + budgetCapUsd: number | undefined, + state: { + readonly starts: Map; + spentUsd: number; + warned80: boolean; + }, +): void { + if (ev.type === "task:start") { + state.starts.set(ev.taskName, ev.timestamp); + console.log(`[progress] ▶ ${ev.taskName} started`); + return; + } + + if (ev.type === "task:complete") { + const startedAt = state.starts.get(ev.taskName); + const durationMs = + startedAt !== undefined ? ev.timestamp - startedAt : ev.metrics.latencyMs; + state.starts.delete(ev.taskName); + + state.spentUsd += ev.metrics.estimatedCost; + console.log( + `[progress] ✓ ${ev.taskName} completed in ${formatMs(durationMs)} | +${formatUsd(ev.metrics.estimatedCost)} (spent ${formatUsd(state.spentUsd)})`, + ); + + if (budgetCapUsd !== undefined) { + const pct = (state.spentUsd / budgetCapUsd) * 100; + console.log( + `[budget] ${formatUsd(state.spentUsd)} / ${formatUsd(budgetCapUsd)} (${pct.toFixed(1)}%)`, + ); + if (!state.warned80 && pct >= 80) { + state.warned80 = true; + console.warn( + `[budget] warning: reached ${pct.toFixed(1)}% of cap ${formatUsd(budgetCapUsd)}`, + ); + } + } else { + console.log(`[budget] spent ${formatUsd(state.spentUsd)} (no cap)`); + } + return; + } + + if (ev.type === "task:error") { + const startedAt = state.starts.get(ev.taskName); + const durationMs = startedAt !== undefined ? ev.timestamp - startedAt : 0; + state.starts.delete(ev.taskName); + const durationSuffix = + durationMs > 0 ? ` after ${formatMs(durationMs)}` : ""; + console.error( + `[progress] ✗ ${ev.taskName} failed${durationSuffix}: ${ev.error.message}`, + ); + return; + } + + if (ev.type === "budget:warning") { + console.warn( + `[budget] executor warning: spent ${formatUsd(ev.spentUsd)} > limit ${formatUsd(ev.limitUsd)}`, + ); + } +} + +export async function runWithProgress( + executor: WorkflowExecutor, + input: WorkflowInput, + budgetCapUsd?: number, +): Promise<{ outputs?: { [K in keyof T]?: unknown }; metrics?: unknown }> { + const state = { + starts: new Map(), + spentUsd: 0, + warned80: false, + }; + const stream = executor.stream(input); + + while (true) { + const next = await stream.next(); + if (next.done) { + return { + outputs: next.value.outputs, + metrics: next.value.metrics, + }; + } + renderEvent(next.value, budgetCapUsd, state); + } +} + async function main(): Promise { // Parse argv: --dry-run flag + issue number + optional --budget=. const args = process.argv.slice(2); @@ -157,7 +251,7 @@ async function main(): Promise { budgetTracker, }); - const result = await executor.run(updatedInput); + const result = await runWithProgress(executor, updatedInput, maxUsd); console.log("[dev-workflow] workflow complete:"); console.log( JSON.stringify( @@ -182,7 +276,9 @@ async function main(): Promise { } } -main().catch((err) => { - console.error("[dev-workflow] fatal:", err); - process.exit(1); -}); +if (import.meta.main) { + main().catch((err) => { + console.error("[dev-workflow] fatal:", err); + process.exit(1); + }); +}