Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions packages/engine/src/services/chunkEncoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import { spawn } from "child_process";
import { copyFileSync, existsSync, mkdirSync, readdirSync, statSync, writeFileSync } from "fs";
import { join, dirname } from "path";
import { trackChildProcess } from "../utils/processTracker.js";
import { killWithEscalation, trackChildProcess } from "../utils/processTracker.js";
import { DEFAULT_CONFIG, type EngineConfig } from "../config.js";
import {
type GpuEncoder,
Expand Down Expand Up @@ -414,29 +414,37 @@ export async function encodeFramesFromDir(
const ffmpeg = spawn("ffmpeg", args);
trackChildProcess(ffmpeg);
let stderr = "";
let timedOut = false;
const cancelEscalations: Array<() => void> = [];
const onAbort = () => {
ffmpeg.kill("SIGTERM");
cancelEscalations.push(killWithEscalation(ffmpeg));
};
if (signal) {
if (signal.aborted) {
ffmpeg.kill("SIGTERM");
onAbort();
} else {
signal.addEventListener("abort", onAbort, { once: true });
}
}

const encodeTimeout = config?.ffmpegEncodeTimeout ?? DEFAULT_CONFIG.ffmpegEncodeTimeout;
const timer = setTimeout(() => {
ffmpeg.kill("SIGTERM");
timedOut = true;
cancelEscalations.push(killWithEscalation(ffmpeg));
}, encodeTimeout);

ffmpeg.stderr.on("data", (data) => {
stderr += data.toString();
});

ffmpeg.on("close", (code) => {
const cleanup = () => {
clearTimeout(timer);
for (const cancel of cancelEscalations) cancel();
if (signal) signal.removeEventListener("abort", onAbort);
};

ffmpeg.on("close", (code) => {
cleanup();
const durationMs = Date.now() - startTime;
if (signal?.aborted) {
resolve({
Expand All @@ -457,7 +465,11 @@ export async function encodeFramesFromDir(
durationMs,
framesEncoded: 0,
fileSize: 0,
error: formatFfmpegError(code, stderr),
// A timeout kill exits with a signal (code null); name the actual
// cause instead of the unhelpful "[FFmpeg] process error".
error: timedOut
? `[FFmpeg] encode timed out after ${encodeTimeout}ms`
: formatFfmpegError(code, stderr),
});
return;
}
Expand All @@ -467,8 +479,7 @@ export async function encodeFramesFromDir(
});

ffmpeg.on("error", (err) => {
clearTimeout(timer);
if (signal) signal.removeEventListener("abort", onAbort);
cleanup();
resolve({
success: false,
outputPath,
Expand Down
11 changes: 7 additions & 4 deletions packages/engine/src/services/streamingEncoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/

import { spawn, type ChildProcess } from "child_process";
import { trackChildProcess } from "../utils/processTracker.js";
import { killWithEscalation, trackChildProcess } from "../utils/processTracker.js";
import { existsSync, mkdirSync, statSync } from "fs";
import { dirname } from "path";

Expand Down Expand Up @@ -390,19 +390,22 @@ export async function spawnStreamingEncoder(
let exitCode: number | null = null;
let exitPromiseResolve: ((value: void) => void) | null = null;
const exitPromise = new Promise<void>((resolve) => (exitPromiseResolve = resolve));
const cancelEscalations: Array<() => void> = [];

// Track stderr for progress and error messages
ffmpeg.stderr?.on("data", (data: Buffer) => {
stderr += data.toString();
});

ffmpeg.on("close", (code: number | null) => {
for (const cancel of cancelEscalations) cancel();
exitCode = code;
exitStatus = code === 0 ? "success" : "error";
exitPromiseResolve?.();
});

ffmpeg.on("error", (err: Error) => {
for (const cancel of cancelEscalations) cancel();
exitStatus = "error";
stderr += `\nProcess error: ${err.message}`;
exitPromiseResolve?.();
Expand All @@ -414,12 +417,12 @@ export async function spawnStreamingEncoder(
// Handle abort signal
const onAbort = () => {
if (exitStatus === "running") {
ffmpeg.kill("SIGTERM");
cancelEscalations.push(killWithEscalation(ffmpeg));
}
};
if (signal) {
if (signal.aborted) {
ffmpeg.kill("SIGTERM");
onAbort();
} else {
signal.addEventListener("abort", onAbort, { once: true });
}
Expand All @@ -440,7 +443,7 @@ export async function spawnStreamingEncoder(
if (timer) clearTimeout(timer);
timer = setTimeout(() => {
if (exitStatus === "running") {
ffmpeg.kill("SIGTERM");
cancelEscalations.push(killWithEscalation(ffmpeg));
}
}, streamingTimeout);
};
Expand Down
31 changes: 23 additions & 8 deletions packages/engine/src/services/videoFrameExtractor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { existsSync, mkdirSync, readdirSync, rmSync } from "fs";
import { isAbsolute, join, posix, resolve, sep } from "path";
import { parseHTML } from "linkedom";
import { decodeUrlPathVariants } from "@hyperframes/core";
import { trackChildProcess } from "../utils/processTracker.js";
import { killWithEscalation, trackChildProcess } from "../utils/processTracker.js";
import { extractMediaMetadata, type VideoMetadata } from "../utils/ffprobe.js";
import {
analyzeCompositionHdr,
Expand Down Expand Up @@ -262,34 +262,50 @@ export async function extractVideoFramesRange(
const ffmpeg = spawn("ffmpeg", args);
trackChildProcess(ffmpeg);
let stderr = "";
let timedOut = false;
const cancelEscalations: Array<() => void> = [];
const onAbort = () => {
ffmpeg.kill("SIGTERM");
cancelEscalations.push(killWithEscalation(ffmpeg));
};
if (signal) {
if (signal.aborted) {
ffmpeg.kill("SIGTERM");
onAbort();
} else {
signal.addEventListener("abort", onAbort, { once: true });
}
}

const timer = setTimeout(() => {
ffmpeg.kill("SIGTERM");
timedOut = true;
cancelEscalations.push(killWithEscalation(ffmpeg));
}, ffmpegProcessTimeout);

ffmpeg.stderr.on("data", (data) => {
stderr += data.toString();
});

ffmpeg.on("close", (code) => {
const cleanup = () => {
clearTimeout(timer);
for (const cancel of cancelEscalations) cancel();
if (signal) signal.removeEventListener("abort", onAbort);
};

ffmpeg.on("close", (code) => {
cleanup();
if (signal?.aborted) {
reject(new Error("Video frame extraction cancelled"));
return;
}
if (code !== 0) {
reject(new Error(`FFmpeg exited with code ${code}: ${stderr.slice(-500)}`));
// A timeout kill exits with a signal (code null); name the actual
// cause instead of the unhelpful "exited with code null".
reject(
new Error(
timedOut
? `FFmpeg frame extraction timed out after ${ffmpegProcessTimeout}ms: ${stderr.slice(-500)}`
: `FFmpeg exited with code ${code}: ${stderr.slice(-500)}`,
),
);
return;
}

Expand All @@ -314,8 +330,7 @@ export async function extractVideoFramesRange(
});

ffmpeg.on("error", (err) => {
clearTimeout(timer);
if (signal) signal.removeEventListener("abort", onAbort);
cleanup();
if ((err as NodeJS.ErrnoException).code === "ENOENT") {
reject(new Error("[FFmpeg] ffmpeg not found"));
} else {
Expand Down
58 changes: 57 additions & 1 deletion packages/engine/src/utils/processTracker.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { describe, it, expect, beforeEach } from "vitest";
import { spawn } from "node:child_process";
import { trackChildProcess, killTrackedProcesses } from "./processTracker.js";
import { trackChildProcess, killTrackedProcesses, killWithEscalation } from "./processTracker.js";

// Reset tracked set between tests by killing everything
beforeEach(() => {
Expand Down Expand Up @@ -72,3 +72,59 @@ describe("killTrackedProcesses", () => {
killTrackedProcesses();
});
});

// On Windows, kill("SIGTERM") maps to TerminateProcess and is unconditional,
// so a trap-based shim can't ignore it and the SIGKILL escalation is never
// reached. The whole block exercises POSIX-only signal semantics; skip it
// there, same as the runFfmpeg kill-escalation suite.
describe.skipIf(process.platform === "win32")("killWithEscalation", () => {
it("kills a SIGTERM-compliant process", async () => {
const proc = spawn("sleep", ["60"], { stdio: "ignore" });

const exitPromise = new Promise<void>((resolve) => proc.on("close", () => resolve()));
const cancel = killWithEscalation(proc);

await exitPromise;
cancel();
expect(proc.signalCode).toBe("SIGTERM");
});

it("escalates to SIGKILL when the process ignores SIGTERM", async () => {
const proc = spawn("bash", ["-c", "trap '' TERM; sleep 60"], { stdio: "ignore" });
// Give bash a beat to install the trap; killing before that races the
// trap setup and SIGTERM would win legitimately.
await new Promise((resolve) => setTimeout(resolve, 200));

const exitPromise = new Promise<void>((resolve) => proc.on("close", () => resolve()));
const cancel = killWithEscalation(proc, 100);

await exitPromise;
cancel();
expect(proc.signalCode).toBe("SIGKILL");
}, 5000);

it("cancel clears the pending escalation", async () => {
const proc = spawn("bash", ["-c", "trap '' TERM; sleep 60"], { stdio: "ignore" });
await new Promise((resolve) => setTimeout(resolve, 200));

const cancel = killWithEscalation(proc, 100);
cancel();

// Past the grace period the process must still be alive: SIGTERM was
// trapped and the SIGKILL escalation was cancelled.
await new Promise((resolve) => setTimeout(resolve, 300));
expect(proc.exitCode).toBeNull();
expect(proc.signalCode).toBeNull();

proc.kill("SIGKILL");
await new Promise<void>((resolve) => proc.on("close", () => resolve()));
}, 5000);

it("does not throw on an already-exited process", async () => {
const proc = spawn("true", { stdio: "ignore" });
await new Promise<void>((resolve) => proc.on("close", () => resolve()));

const cancel = killWithEscalation(proc);
cancel();
});
});
35 changes: 35 additions & 0 deletions packages/engine/src/utils/processTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,41 @@ export function trackChildProcess(proc: ChildProcess): void {
proc.once("error", remove);
}

const KILL_ESCALATION_GRACE_MS = 500;

/**
* Kill a single child process with SIGTERM, escalating to SIGKILL if it has
* not exited after a short grace period. Same policy as
* killTrackedProcesses(), but for timeout/abort kills of one process whose
* caller is awaiting its `close` event — without the escalation, a process
* that ignores SIGTERM (stuck I/O, frozen pipe) never emits `close` and the
* awaiting promise hangs forever.
*
* Returns a cancel function; call it once the process exits so the
* escalation timer doesn't outlive it.
*/
export function killWithEscalation(
proc: ChildProcess,
graceMs: number = KILL_ESCALATION_GRACE_MS,
): () => void {
try {
proc.kill("SIGTERM");
} catch {
// Already exited.
}
const timer = setTimeout(() => {
if (proc.exitCode === null && proc.signalCode === null) {
try {
proc.kill("SIGKILL");
} catch {
// Already exited.
}
}
}, graceMs);
timer.unref();
return () => clearTimeout(timer);
}

/**
* SIGTERM all tracked child processes, then SIGKILL any that survive
* after a short grace period.
Expand Down
74 changes: 72 additions & 2 deletions packages/engine/src/utils/runFfmpeg.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { describe, expect, it } from "vitest";
import { afterAll, beforeAll, describe, expect, it } from "vitest";
import { chmodSync, mkdtempSync, rmSync, writeFileSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";

import { formatFfmpegError } from "./runFfmpeg.js";
import { formatFfmpegError, runFfmpeg } from "./runFfmpeg.js";

describe("formatFfmpegError", () => {
it("reports exit code alone when stderr is empty", () => {
Expand Down Expand Up @@ -44,3 +47,70 @@ describe("formatFfmpegError", () => {
expect(formatFfmpegError(null, "spawn ffmpeg ENOENT")).toBe("[FFmpeg] spawn ffmpeg ENOENT");
});
});

// Shadows the real ffmpeg via PATH with a script that traps SIGTERM. Before
// the SIGKILL escalation, both kill paths sent SIGTERM only, so a stuck
// ffmpeg never emitted `close` and these awaits hung forever (the test would
// fail by timeout). Shell-script shim, so skipped on Windows.
//
// The shim is careful about two things:
// - It prints "ready" to stderr after installing the trap, and the tests
// only treat a kill as trap-protected once that line is seen. Killing
// earlier races the trap setup and SIGTERM would win legitimately.
// - It blocks on the builtin `read` instead of spawning `sleep`, so no
// child process inherits the stdio pipes. A SIGKILLed shim closes its
// pipes immediately and `close` fires right away; an inherited pipe
// would defer `close` until the child exits.
describe.skipIf(process.platform === "win32")("runFfmpeg kill escalation", () => {
let fakeBinDir: string;
let originalPath: string | undefined;

beforeAll(async () => {
fakeBinDir = mkdtempSync(join(tmpdir(), "hf-fake-ffmpeg-"));
const script = join(fakeBinDir, "ffmpeg");
writeFileSync(script, '#!/usr/bin/env bash\ntrap "" TERM\necho ready >&2\nread -t 60 _\n');
chmodSync(script, 0o755);
originalPath = process.env.PATH;
process.env.PATH = `${fakeBinDir}:${process.env.PATH ?? ""}`;

// Warm-up run: the first exec of a fresh script file can be slow
// (macOS scans it on first run). Waiting for "ready" once here keeps
// the timed tests below from racing that one-time latency.
const controller = new AbortController();
await runFfmpeg([], {
signal: controller.signal,
onStderr: (line) => {
if (line.includes("ready")) controller.abort();
},
});
}, 15_000);

afterAll(() => {
process.env.PATH = originalPath;
rmSync(fakeBinDir, { recursive: true, force: true });
});

it("resolves instead of hanging when a timed-out ffmpeg ignores SIGTERM", async () => {
const result = await runFfmpeg([], { timeout: 500 });
expect(result.success).toBe(false);
expect(result.exitCode).toBeNull();
// Resolution must have come through the SIGKILL escalation: timeout
// (500ms) plus the escalation grace period, not a SIGTERM exit at
// the timeout mark.
expect(result.durationMs).toBeGreaterThanOrEqual(900);
}, 5000);

it("resolves instead of hanging when an aborted ffmpeg ignores SIGTERM", async () => {
const controller = new AbortController();
const result = await runFfmpeg([], {
signal: controller.signal,
onStderr: (line) => {
// Abort only after the trap is installed so SIGTERM is guaranteed
// to be ignored and the SIGKILL escalation is what unblocks us.
if (line.includes("ready")) controller.abort();
},
});
expect(result.success).toBe(false);
expect(result.durationMs).toBeGreaterThanOrEqual(450);
}, 5000);
});
Loading
Loading