Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/scope/cloud/livepeer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from pathlib import Path
from typing import Any

import aiohttp
import click
import httpx
import uvicorn
Expand Down Expand Up @@ -210,11 +211,19 @@ async def _media_input_loop(
frame_processor.put(frame)
except asyncio.CancelledError:
raise
except aiohttp.ClientPayloadError as exc:
# Orchestrator truncated the transfer mid-stream (e.g. TransferEncodingError 400)
# or dropped the connection. This is a network-level disconnect — treat as a
# graceful stop rather than an application error.
logger.warning("Media input loop: orchestrator disconnected mid-stream: %s", exc)
except Exception as exc:
logger.error("Media input loop failed: %s", exc)
finally:
try:
await media_output.close()
except aiohttp.ClientConnectorError as exc:
# Host already unreachable (orchestrator went down); suppress cleanup errors.
logger.debug("Media output close: orchestrator unreachable (suppressed): %s", exc)
except Exception as exc:
logger.warning("Media output close failed: %s", exc)
if session.media_output is media_output:
Expand Down
11 changes: 10 additions & 1 deletion src/scope/server/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1104,10 +1104,19 @@ def _setup_pipelines_sync(self):
if self.parameters.get("vace_enabled") and self.parameters.get(
"vace_use_input_video", True
):
# Only the *last* VACEEnabledPipeline in the chain should
# receive raw input video as vace_input_frames. Earlier
# VACEEnabledPipeline instances (e.g. yolo_mask preprocessors)
# produce their own VACE frames that downstream pipelines
# consume — they must NOT also be wired to receive raw video
# on vace_input_frames, as that would create a fan-in conflict.
last_vace_pid: str | None = None
for pid in self.pipeline_ids:
pipeline = self.pipeline_manager.get_pipeline_by_id(pid)
if isinstance(pipeline, VACEEnabledPipeline):
vace_input_video_ids.add(pid)
last_vace_pid = pid
if last_vace_pid is not None:
vace_input_video_ids.add(last_vace_pid)

api_graph = build_linear_graph(
self.pipeline_ids,
Expand Down
104 changes: 95 additions & 9 deletions src/scope/server/graph_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
from typing import TYPE_CHECKING, Any

from .graph_schema import GraphConfig
from .pipeline_manager import PipelineNotAvailableException
from .pipeline_processor import PipelineProcessor

if TYPE_CHECKING:
from .pipeline_manager import PipelineManager

# VACE port names that VACEEnabledPipeline instances always accept
_VACE_INPUT_PORTS = frozenset({"vace_input_frames", "vace_input_masks"})

logger = logging.getLogger(__name__)

# Default queue sizes (match pipeline_processor)
Expand Down Expand Up @@ -76,17 +80,78 @@ def build_graph(
# Validate edge ports against pipeline input/output declarations
_validate_edge_ports(graph, pipeline_manager)

# 1) Create one queue per edge (all edges are stream; frame-by-frame)
# 1a) Resolve fan-in conflicts before creating queues.
#
# When both a source-node edge and a pipeline-node edge target the same
# input port (e.g. "input:video → longlive:vace_input_frames" AND
# "yolo_mask:vace_input_frames → longlive:vace_input_frames"), prefer the
# pipeline-node edge — it carries preprocessed data and is the intended
# signal. Raise only if two pipeline-to-pipeline edges conflict, which
# is always a graph authoring error.
source_node_ids = set(graph.get_source_node_ids())
stream_edges = [e for e in graph.edges if e.kind == "stream"]

# Group by destination (to_node, to_port)
port_edges: dict[tuple[str, str], list] = {}
for e in stream_edges:
port_edges.setdefault((e.to_node, e.to_port), []).append(e)

resolved_edges = []
for (to_node, to_port), edge_list in port_edges.items():
if len(edge_list) == 1:
resolved_edges.append(edge_list[0])
continue

pipeline_edges = [e for e in edge_list if e.from_node not in source_node_ids]
source_edges = [e for e in edge_list if e.from_node in source_node_ids]

if len(pipeline_edges) > 1:
# Two pipeline nodes both feed the same input — genuine conflict.
conflict_desc = ", ".join(
f"{e.from_node!r}:{e.from_port!r}" for e in pipeline_edges
)
raise ValueError(
f"Duplicate stream edges to the same input port: "
f"node={to_node!r}, port={to_port!r}. "
f"Multiple pipeline sources conflict: [{conflict_desc}]. "
f"Fan-in to a single port is not supported."
)
elif pipeline_edges:
# One pipeline edge wins over any source edges — expected pattern
# when a preprocessor forwards VACE frames while the Workflow
# Builder also has "use input video as VACE" toggled on.
discarded = [f"{e.from_node!r}:{e.from_port!r}" for e in source_edges]
logger.info(
"Resolved fan-in on %s:%s — keeping pipeline edge from %r, "
"discarding source edge(s): [%s]",
to_node,
to_port,
pipeline_edges[0].from_node,
", ".join(discarded),
)
resolved_edges.append(pipeline_edges[0])
else:
# Multiple source edges — still a conflict.
conflict_desc = ", ".join(
f"{e.from_node!r}:{e.from_port!r}" for e in source_edges
)
raise ValueError(
f"Duplicate stream edges to the same input port: "
f"node={to_node!r}, port={to_port!r}. "
f"Multiple source edges conflict: [{conflict_desc}]. "
f"Fan-in to a single port is not supported."
)

# Rebuild graph with resolved (deduplicated) edges so downstream logic
# only sees the authoritative set.
non_stream_edges = [e for e in graph.edges if e.kind != "stream"]
graph = graph.model_copy(update={"edges": non_stream_edges + resolved_edges})

# 1b) Create one queue per resolved edge.
stream_queues: dict[tuple[str, str], queue.Queue] = {}
for e in graph.edges:
if e.kind == "stream":
key = (e.to_node, e.to_port)
if key in stream_queues:
raise ValueError(
f"Duplicate stream edges to the same input port: "
f"node={e.to_node!r}, port={e.to_port!r}. "
f"Fan-in to a single port is not supported."
)
stream_queues[key] = queue.Queue(maxsize=DEFAULT_INPUT_QUEUE_MAXSIZE)

# 2) Create a processor per pipeline node and wire input_queues per port
Expand Down Expand Up @@ -221,14 +286,35 @@ def _validate_edge_ports(
Raises:
ValueError: If any edge references an undeclared port.
"""
from scope.core.pipelines.wan2_1.vace import VACEEnabledPipeline

# Build a map of node_id -> (declared_inputs, declared_outputs)
port_map: dict[str, tuple[set[str], set[str]]] = {}
for node in graph.nodes:
if node.type != "pipeline" or node.pipeline_id is None:
continue
pipeline = pipeline_manager.get_pipeline_by_id(node.id)
try:
pipeline = pipeline_manager.get_pipeline_by_id(node.id)
except PipelineNotAvailableException:
# Pipeline is being reloaded (e.g. vace_enabled toggled).
# Skip port validation for this node rather than crashing.
logger.warning(
"Pipeline %r unavailable during port validation (reloading?); "
"skipping port checks for this node.",
node.id,
)
continue
config_class = pipeline.get_config_class()
port_map[node.id] = (set(config_class.inputs), set(config_class.outputs))
declared_inputs = set(config_class.inputs)
declared_outputs = set(config_class.outputs)

# VACEEnabledPipeline instances always accept VACE input ports,
# even when the pipeline was loaded with vace_enabled=False and
# config_class.inputs only lists "video".
if isinstance(pipeline, VACEEnabledPipeline):
declared_inputs |= _VACE_INPUT_PORTS

port_map[node.id] = (declared_inputs, declared_outputs)

errors: list[str] = []
for e in graph.edges:
Expand Down
Loading