Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dotflow/cli/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from dotflow.cli.commands.cloud import CloudGenerateCommand, CloudListCommand
from dotflow.cli.commands.deploy import DeployCommand
from dotflow.cli.commands.flow import FlowCommand
from dotflow.cli.commands.init import InitCommand
from dotflow.cli.commands.log import LogCommand
from dotflow.cli.commands.schedule import ScheduleCommand
Expand All @@ -11,6 +12,7 @@
"CloudGenerateCommand",
"CloudListCommand",
"DeployCommand",
"FlowCommand",
"InitCommand",
"LogCommand",
"ScheduleCommand",
Expand Down
35 changes: 35 additions & 0 deletions dotflow/cli/commands/flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Command flow module"""

from dotflow.cli.command import Command
from dotflow.core.module import Module
from dotflow.utils.visualizer import visualize


class FlowCommand(Command):
def setup(self):
step = self.params.step
mode = self.params.mode
fmt = self.params.format

loaded = Module(value=step)
Comment thread
FernandoCelmer marked this conversation as resolved.
tasks = self._extract_tasks(loaded)
visualize(tasks=tasks, mode=mode, fmt=fmt)

@staticmethod
def _extract_tasks(obj) -> list:
"""
Accept any of the three common ways a user might point us at tasks:
1. A DotFlow instance → obj.task.queue
2. A TaskBuilder instance → obj.queue
3. A plain list[Task] → obj
"""
if hasattr(obj, "task") and hasattr(obj.task, "queue"):
return list(obj.task.queue)

if hasattr(obj, "queue"):
return list(obj.queue)

if isinstance(obj, list):
return obj

return []
38 changes: 38 additions & 0 deletions dotflow/cli/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
CloudGenerateCommand,
CloudListCommand,
DeployCommand,
FlowCommand,
InitCommand,
LogCommand,
ScheduleCommand,
Expand Down Expand Up @@ -44,6 +45,7 @@ def __init__(self, parser):
self.setup_schedule()
self.setup_cloud()
self.setup_deploy()
self.setup_flow()
self.command()

def setup_init(self):
Expand Down Expand Up @@ -210,6 +212,42 @@ def setup_deploy(self):
)
self.cmd_deploy.set_defaults(exec=DeployCommand)

def setup_flow(self):
self.cmd_flow = self.subparsers.add_parser(
"flow",
help="Visualize a workflow pipeline in the terminal",
)
self.cmd_flow = self.cmd_flow.add_argument_group(
"Usage: dotflow flow [OPTIONS]"
)

self.cmd_flow.add_argument(
"-s",
"--step",
required=True,
help="Dotted path to a DotFlow instance, TaskBuilder, or task list",
)
self.cmd_flow.add_argument(
"-m",
"--mode",
default=TypeExecution.SEQUENTIAL,
choices=[
TypeExecution.SEQUENTIAL,
TypeExecution.BACKGROUND,
TypeExecution.PARALLEL,
"sequential_group",
],
help="Execution mode to visualize (default: sequential)",
)
self.cmd_flow.add_argument(
"--format",
default="terminal",
choices=["terminal", "mermaid"],
help="Output format: terminal (default) or mermaid",
)

self.cmd_flow.set_defaults(exec=FlowCommand)

def setup_logs(self):
self.cmd_logs = self.subparsers.add_parser("logs", help="Logs")
self.cmd_logs = self.cmd_logs.add_argument_group(
Expand Down
Loading
Loading