diff --git a/tests/chainable_commands_experiment/__init__.py b/tests/chainable_commands_experiment/__init__.py new file mode 100644 index 000000000..1142f4600 --- /dev/null +++ b/tests/chainable_commands_experiment/__init__.py @@ -0,0 +1 @@ +"""Typed command-chain API experiments for libtmux.""" diff --git a/tests/chainable_commands_experiment/ast_api.py b/tests/chainable_commands_experiment/ast_api.py new file mode 100644 index 000000000..87d81f488 --- /dev/null +++ b/tests/chainable_commands_experiment/ast_api.py @@ -0,0 +1,98 @@ +"""AST-assisted command-chain experiment.""" + +from __future__ import annotations + +import ast +import inspect +import textwrap +from collections.abc import Callable + +from .shared import ( + CommandCall, + CommandSequence, + new_window_call, + split_window_call, +) + + +class UnsupportedAstShape(ValueError): + """Raised when a callable uses syntax outside the AST prototype.""" + + +class AstCommandProxy: + """Typed proxy used by the AST experiment.""" + + def __init__(self) -> None: + """Initialize an empty proxy call list.""" + self._calls: list[CommandCall] = [] + + def to_sequence(self) -> CommandSequence: + """Return accumulated calls as a command sequence.""" + return CommandSequence(tuple(self._calls)) + + def new_window( + self, + *, + window_name: str | None = None, + detach: bool = True, + ) -> CommandCall: + """Add a ``new-window`` proxy call.""" + call = new_window_call(window_name=window_name, detach=detach) + self._calls.append(call) + return call + + def split_window( + self, + *, + horizontal: bool = False, + percentage: int | None = None, + ) -> CommandCall: + """Add a ``split-window`` proxy call.""" + call = split_window_call( + horizontal=horizontal, + percentage=percentage, + ) + self._calls.append(call) + return call + + +def command_names_from_ast( + function: Callable[[AstCommandProxy], object], +) -> tuple[str, ...]: + """Extract command method names from a restricted callable AST.""" + function_def = _function_def_from_source(function) + if any( + isinstance(node, ast.For | ast.While | ast.If | ast.Match) + for node in ast.walk(function_def) + ): + msg = "control flow is outside this AST prototype" + raise UnsupportedAstShape(msg) + + names = [ + node.func.attr + for node in ast.walk(function_def) + if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute) + ] + if not names: + msg = "no proxy command calls found" + raise UnsupportedAstShape(msg) + return tuple(names) + + +def from_callable(function: Callable[[AstCommandProxy], object]) -> CommandSequence: + """Validate a callable's AST, then execute it against a typed proxy.""" + command_names_from_ast(function) + proxy = AstCommandProxy() + function(proxy) + return proxy.to_sequence() + + +def _function_def_from_source( + function: Callable[[AstCommandProxy], object], +) -> ast.FunctionDef: + source = textwrap.dedent(inspect.getsource(function)) + module = ast.parse(source) + if len(module.body) != 1 or not isinstance(module.body[0], ast.FunctionDef): + msg = "expected a single function definition" + raise UnsupportedAstShape(msg) + return module.body[0] diff --git a/tests/chainable_commands_experiment/async_deferred_plan_api.py b/tests/chainable_commands_experiment/async_deferred_plan_api.py new file mode 100644 index 000000000..00947196c --- /dev/null +++ b/tests/chainable_commands_experiment/async_deferred_plan_api.py @@ -0,0 +1,172 @@ +"""Asyncio facade over deferred query-command plan experiments.""" + +from __future__ import annotations + +import dataclasses +import typing as t +from dataclasses import dataclass + +from . import deferred_plan_api as sync_api +from .shared import Arg, CommandCall, CommandResultLike + +MappedT = t.TypeVar("MappedT") +ResultT = t.TypeVar("ResultT") + +NoCommandsResolved = sync_api.NoCommandsResolved + + +class AsyncCommandRunner(t.Protocol): + """Object capable of asynchronously dispatching one tmux command argv.""" + + async def cmd( + self, + cmd: str, + *args: Arg, + target: str | int | None = None, + ) -> CommandResultLike: + """Dispatch one tmux command asynchronously.""" + ... + + +class AsyncSnapshotProvider(t.Protocol): + """Object that can asynchronously provide a pure tmux snapshot.""" + + async def snapshot(self) -> sync_api.TmuxSnapshot: + """Return a tmux snapshot asynchronously.""" + ... + + +class AsyncPlanRunner( + AsyncCommandRunner, + AsyncSnapshotProvider, + t.Protocol, +): + """Runner that can resolve async queries and dispatch async commands.""" + + +AsyncSnapshotSource: t.TypeAlias = sync_api.TmuxSnapshot | AsyncSnapshotProvider + + +@dataclass(frozen=True, slots=True) +class PaneQuery: + """Async lazy pane query backed by the sync deferred query object.""" + + query: sync_api.PaneQuery + + def filter(self, *, active: bool) -> PaneQuery: + """Return a query filtered by active state.""" + return dataclasses.replace(self, query=self.query.filter(active=active)) + + def order_by(self, field: sync_api.OrderField) -> PaneQuery: + """Return a query ordered by a known pane field.""" + return dataclasses.replace(self, query=self.query.order_by(field)) + + def limit(self, count: int) -> PaneQuery: + """Return a query capped to ``count`` rows.""" + return dataclasses.replace(self, query=self.query.limit(count)) + + async def all(self, source: AsyncSnapshotSource) -> list[sync_api.PaneRef]: + """Evaluate the query against an async snapshot source.""" + snapshot = await _resolve_snapshot(source) + return self.query.all(snapshot) + + async def first( + self, + source: AsyncSnapshotSource, + ) -> sync_api.PaneRef | None: + """Evaluate the query and return its first row.""" + snapshot = await _resolve_snapshot(source) + return self.query.first(snapshot) + + def map( + self, + mapper: t.Callable[[sync_api.PaneRef], MappedT], + ) -> MappedPaneQuery[MappedT]: + """Return a data transformation query.""" + return MappedPaneQuery(query=self, mapper=mapper) + + def each(self, mapper: sync_api.CommandMapper) -> CommandPlan[None]: + """Return a deferred async side-effect command plan.""" + return self.flat_map(mapper) + + def flat_map(self, mapper: sync_api.CommandMapper) -> CommandPlan[None]: + """Return a deferred async multi-command side-effect plan.""" + return CommandPlan(query=self, mapper=mapper) + + +@dataclass(frozen=True, slots=True) +class MappedPaneQuery(t.Generic[MappedT]): + """Async data-only query transformation over pane refs.""" + + query: PaneQuery + mapper: t.Callable[[sync_api.PaneRef], MappedT] + + async def all(self, source: AsyncSnapshotSource) -> list[MappedT]: + """Evaluate the query and transform every row.""" + return [self.mapper(row) for row in await self.query.all(source)] + + async def first(self, source: AsyncSnapshotSource) -> MappedT | None: + """Evaluate the query and transform the first row.""" + row = await self.query.first(source) + if row is None: + return None + return self.mapper(row) + + +@dataclass(frozen=True, slots=True) +class CommandSequence: + """Async wrapper around a resolved sync command sequence.""" + + sequence: sync_api.CommandSequence + + @property + def calls(self) -> tuple[CommandCall, ...]: + """Return resolved command calls.""" + return self.sequence.calls + + def argvs(self) -> tuple[tuple[str, ...], ...]: + """Render each command independently.""" + return self.sequence.argvs() + + def argv(self) -> tuple[str, ...]: + """Render one native tmux semicolon command sequence.""" + return self.sequence.argv() + + async def run(self, runner: AsyncCommandRunner) -> CommandResultLike: + """Dispatch the sequence through one async runner call.""" + argv = self.argv() + return await runner.cmd(argv[0], *argv[1:]) + + +@dataclass(frozen=True, slots=True) +class CommandPlan(t.Generic[ResultT]): + """Async command plan that resolves a query into commands.""" + + query: PaneQuery + mapper: sync_api.CommandMapper + + async def to_sequence(self, source: AsyncSnapshotSource) -> CommandSequence: + """Resolve the async query and compile mapped commands.""" + snapshot = await _resolve_snapshot(source) + plan = self.query.query.flat_map(self.mapper) + return CommandSequence(plan.to_sequence(snapshot)) + + async def run(self: CommandPlan[None], runner: AsyncPlanRunner) -> None: + """Resolve, compile, and execute the plan through one async dispatch.""" + try: + sequence = await self.to_sequence(runner) + except NoCommandsResolved: + return None + await sequence.run(runner) + return None + + +def panes() -> PaneQuery: + """Start an async lazy pane query.""" + return PaneQuery(sync_api.panes()) + + +async def _resolve_snapshot(source: AsyncSnapshotSource) -> sync_api.TmuxSnapshot: + if isinstance(source, sync_api.TmuxSnapshot): + return source + return await source.snapshot() diff --git a/tests/chainable_commands_experiment/async_query_api.py b/tests/chainable_commands_experiment/async_query_api.py new file mode 100644 index 000000000..2894177f8 --- /dev/null +++ b/tests/chainable_commands_experiment/async_query_api.py @@ -0,0 +1,57 @@ +"""Piccolo-style async query experiment.""" + +from __future__ import annotations + +import dataclasses +from dataclasses import dataclass + + +@dataclass(frozen=True, slots=True) +class AsyncPaneRow: + """Small async pane row.""" + + pane_id: str + active: bool + + +@dataclass(frozen=True, slots=True) +class AsyncPaneRunner: + """Async runner returning fixed pane rows.""" + + rows: tuple[AsyncPaneRow, ...] + + async def list_panes(self) -> list[AsyncPaneRow]: + """Return pane rows asynchronously.""" + return list(self.rows) + + +@dataclass(frozen=True, slots=True) +class AsyncPaneQuery: + """Immutable async pane query.""" + + active_filter: bool | None = None + limit_count: int | None = None + + def where(self, *, active: bool) -> AsyncPaneQuery: + """Return a query filtered by active state.""" + return dataclasses.replace(self, active_filter=active) + + def limit(self, count: int) -> AsyncPaneQuery: + """Return a query capped to ``count`` rows.""" + return dataclasses.replace(self, limit_count=count) + + async def all(self, runner: AsyncPaneRunner) -> list[AsyncPaneRow]: + """Evaluate the async query.""" + rows = await runner.list_panes() + if self.active_filter is not None: + rows = [row for row in rows if row.active is self.active_filter] + if self.limit_count is not None: + rows = rows[: self.limit_count] + return rows + + async def first(self, runner: AsyncPaneRunner) -> AsyncPaneRow | None: + """Return the first matching row asynchronously.""" + rows = await self.limit(1).all(runner) + if not rows: + return None + return rows[0] diff --git a/tests/chainable_commands_experiment/auto_batch_api.py b/tests/chainable_commands_experiment/auto_batch_api.py new file mode 100644 index 000000000..daee3cb65 --- /dev/null +++ b/tests/chainable_commands_experiment/auto_batch_api.py @@ -0,0 +1,85 @@ +"""Transparent auto-batch experiment for self-returning APIs.""" + +from __future__ import annotations + +from dataclasses import dataclass + +from typing_extensions import Self + +from .shared import ( + Arg, + CommandCall, + CommandSequence, + rename_window_call, + select_layout_call, + show_option_call, +) + + +class DeferredOutputUnavailable(RuntimeError): + """Raised when a deferred command result is inspected before dispatch.""" + + +@dataclass(frozen=True, slots=True) +class DeferredCommandResult: + """Placeholder result returned by a transparent auto-batch target.""" + + call: CommandCall + + @property + def stdout(self) -> list[str]: + """Reject immediate stdout access.""" + msg = "deferred command output is unavailable until the chain is run" + raise DeferredOutputUnavailable(msg) + + @property + def stderr(self) -> list[str]: + """Reject immediate stderr access.""" + msg = "deferred command errors are unavailable until the chain is run" + raise DeferredOutputUnavailable(msg) + + @property + def returncode(self) -> int: + """Reject immediate return-code access.""" + msg = "deferred command status is unavailable until the chain is run" + raise DeferredOutputUnavailable(msg) + + +class AutoBatchTarget: + """Small target object that accumulates existing ``cmd``-style calls.""" + + def __init__(self) -> None: + """Initialize an empty pending call list.""" + self._calls: list[CommandCall] = [] + + def cmd( + self, + cmd: str, + *args: Arg, + target: str | int | None = None, + ) -> DeferredCommandResult: + """Add a command call instead of dispatching it.""" + call = CommandCall(cmd, args, target=target) + self._calls.append(call) + return DeferredCommandResult(call) + + def to_sequence(self) -> CommandSequence: + """Return the accumulated calls as a sequence.""" + return CommandSequence(tuple(self._calls)) + + def rename_window(self, new_name: str) -> Self: + """Add a self-returning ``rename-window`` method.""" + call = rename_window_call(new_name) + self._calls.append(call) + return self + + def select_layout(self, layout: str) -> Self: + """Add a self-returning ``select-layout`` method.""" + call = select_layout_call(layout) + self._calls.append(call) + return self + + def show_option(self, option_name: str) -> list[str]: + """Demonstrate why immediate-output methods cannot auto-batch.""" + call = show_option_call(option_name) + return self.cmd(call.name, *call.args).stdout diff --git a/tests/chainable_commands_experiment/builder_api.py b/tests/chainable_commands_experiment/builder_api.py new file mode 100644 index 000000000..35b2cf4e6 --- /dev/null +++ b/tests/chainable_commands_experiment/builder_api.py @@ -0,0 +1,43 @@ +"""Explicit builder experiment for typed command chains.""" + +from __future__ import annotations + +from .shared import ( + CommandCall, + CommandSequence, + new_window_call, + split_window_call, +) + + +class Commands: + """Completion-friendly command factory.""" + + def new_window( + self, + *, + window_name: str | None = None, + detach: bool = True, + ) -> CommandCall: + """Build a ``new-window`` call.""" + return new_window_call(window_name=window_name, detach=detach) + + def split_window( + self, + *, + horizontal: bool = False, + percentage: int | None = None, + ) -> CommandCall: + """Build a ``split-window`` call.""" + return split_window_call(horizontal=horizontal, percentage=percentage) + + +commands = Commands() + + +def sequence( + first: CommandCall, + *rest: CommandCall, +) -> CommandSequence: + """Create an immutable command sequence expression.""" + return CommandSequence((first, *rest)) diff --git a/tests/chainable_commands_experiment/command_object_api.py b/tests/chainable_commands_experiment/command_object_api.py new file mode 100644 index 000000000..79d75d694 --- /dev/null +++ b/tests/chainable_commands_experiment/command_object_api.py @@ -0,0 +1,378 @@ +"""Explicit command-object experiment for typed command values.""" + +from __future__ import annotations + +import types +import typing as t +from dataclasses import dataclass + +from typing_extensions import Self + +from .shared import ( + Arg, + CommandCall, + CommandResultLike, + CommandRunner, + CommandSequence as SharedCommandSequence, + CommandSpec, +) + + +class CommandValue: + """Base for command objects that are values before they are effects.""" + + spec: t.ClassVar[CommandSpec] + + def to_call(self) -> CommandCall: + """Compile this command object to the shared command-call IR.""" + raise NotImplementedError + + def argv(self) -> tuple[str, ...]: + """Render this command object as tmux argv tokens.""" + return self.to_call().argv() + + def run(self, runner: CommandRunner) -> CommandResultLike: + """Dispatch this command object through an explicit runner boundary.""" + call = self.to_call() + return runner.cmd(call.name, *call.args, target=call.target) + + def then(self, other: Commandish) -> CommandObjectSequence: + """Compose this command object with another command value.""" + return CommandObjectSequence((self.to_call(),)).then(other) + + def __rshift__(self, other: Commandish) -> CommandObjectSequence: + """Compose command objects with ``>>``.""" + return self.then(other) + + +Commandish: t.TypeAlias = CommandValue | CommandCall | SharedCommandSequence + + +@dataclass(frozen=True, slots=True) +class CommandObjectSequence(SharedCommandSequence): + """Command sequence that also accepts command-object values.""" + + def then(self, other: Commandish) -> CommandObjectSequence: + """Return a sequence with another command-ish value appended.""" + return CommandObjectSequence((*self.calls, *_to_sequence(other).calls)) + + def __rshift__(self, other: Commandish) -> CommandObjectSequence: + """Compose command-object sequences with ``>>``.""" + return self.then(other) + + +class ServerCmd: + """Explicit command objects for server-scoped commands.""" + + @dataclass(frozen=True, slots=True) + class SetOption(CommandValue): + """Command object for ``set-option``.""" + + option_name: str + value: str + target: str | int | None = None + + spec: t.ClassVar[CommandSpec] = CommandSpec( + name="set-option", + scope="server", + ) + + def to_call(self) -> CommandCall: + """Compile to a shared command call.""" + return CommandCall( + self.spec.name, + ("-gq", self.option_name, self.value), + target=self.target, + ) + + @dataclass(frozen=True, slots=True) + class ShowOption(CommandValue): + """Command object for ``show-option``.""" + + option_name: str + target: str | int | None = None + + spec: t.ClassVar[CommandSpec] = CommandSpec( + name="show-option", + scope="server", + ) + + def to_call(self) -> CommandCall: + """Compile to a shared command call.""" + return CommandCall( + self.spec.name, + ("-gqv", self.option_name), + target=self.target, + ) + + @dataclass(frozen=True, slots=True) + class HasSession(CommandValue): + """Command object for ``has-session``.""" + + session_name: str + + spec: t.ClassVar[CommandSpec] = CommandSpec( + name="has-session", + scope="server", + ) + + def to_call(self) -> CommandCall: + """Compile to a shared command call.""" + return CommandCall(self.spec.name, ("-t", self.session_name)) + + +class SessionCmd: + """Explicit command objects for session-scoped commands.""" + + @dataclass(frozen=True, slots=True) + class NewWindow(CommandValue): + """Command object for ``new-window``.""" + + target: str | int | None = None + window_name: str | None = None + detach: bool = True + + spec: t.ClassVar[CommandSpec] = CommandSpec( + name="new-window", + scope="session", + ) + + def to_call(self) -> CommandCall: + """Compile to a shared command call.""" + args: list[Arg] = [] + if self.detach: + args.append("-d") + if self.window_name is not None: + args.extend(("-n", self.window_name)) + return CommandCall(self.spec.name, tuple(args), target=self.target) + + +class WindowCmd: + """Explicit command objects for window-scoped commands.""" + + @dataclass(frozen=True, slots=True) + class RenameWindow(CommandValue): + """Command object for ``rename-window``.""" + + name: str + target: str | int | None = None + + spec: t.ClassVar[CommandSpec] = CommandSpec( + name="rename-window", + scope="window", + ) + + def to_call(self) -> CommandCall: + """Compile to a shared command call.""" + return CommandCall(self.spec.name, (self.name,), target=self.target) + + @dataclass(frozen=True, slots=True) + class SelectLayout(CommandValue): + """Command object for ``select-layout``.""" + + layout: str + target: str | int | None = None + + spec: t.ClassVar[CommandSpec] = CommandSpec( + name="select-layout", + scope="window", + ) + + def to_call(self) -> CommandCall: + """Compile to a shared command call.""" + return CommandCall(self.spec.name, (self.layout,), target=self.target) + + +class PaneCmd: + """Explicit command objects for pane-scoped commands.""" + + @dataclass(frozen=True, slots=True) + class SplitWindow(CommandValue): + """Command object for ``split-window``.""" + + target: str | int | None = None + horizontal: bool = False + percentage: int | None = None + + spec: t.ClassVar[CommandSpec] = CommandSpec( + name="split-window", + scope="pane", + ) + + def to_call(self) -> CommandCall: + """Compile to a shared command call.""" + args: list[Arg] = [] + if self.horizontal: + args.append("-h") + if self.percentage is not None: + args.extend(("-p", self.percentage)) + return CommandCall(self.spec.name, tuple(args), target=self.target) + + @dataclass(frozen=True, slots=True) + class CapturePane(CommandValue): + """Command object for ``capture-pane``.""" + + target: str | int | None = None + print_output: bool = True + + spec: t.ClassVar[CommandSpec] = CommandSpec( + name="capture-pane", + scope="pane", + ) + + def to_call(self) -> CommandCall: + """Compile to a shared command call.""" + args: tuple[Arg, ...] = ("-p",) if self.print_output else () + return CommandCall(self.spec.name, args, target=self.target) + + +def CommandSequenceBuilder( + first: CommandValue, + *rest: CommandValue, +) -> CommandObjectSequence: + """Build a native tmux command sequence from command objects.""" + return CommandObjectSequence( + tuple(command.to_call() for command in (first, *rest)), + ) + + +class CommandBatch: + """Accumulate command-object factories in explicit namespaces.""" + + def __init__(self) -> None: + """Initialize command namespaces.""" + self._commands: list[CommandValue] = [] + self.server = ServerCommands(self) + self.session = SessionCommands(self) + self.window = WindowCommands(self) + self.pane = PaneCommands(self) + + def __enter__(self) -> Self: + """Enter the batch context.""" + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc: BaseException | None, + traceback: types.TracebackType | None, + ) -> None: + """Leave the batch context.""" + + def add(self, command: CommandT) -> CommandT: + """Add a command object and return its concrete type.""" + self._commands.append(command) + return command + + def to_sequence(self) -> CommandObjectSequence: + """Return accumulated commands as a native tmux command sequence.""" + return CommandSequenceBuilder(*self._commands) + + +CommandT = t.TypeVar("CommandT", bound=CommandValue) + + +class ServerCommands: + """Typed namespace for server command objects.""" + + def __init__(self, batch: CommandBatch) -> None: + """Store the parent batch.""" + self._batch = batch + + def set_option( + self, + *, + option_name: str, + value: str, + target: str | int | None = None, + ) -> ServerCmd.SetOption: + """Add a ``set-option`` command object.""" + return self._batch.add( + ServerCmd.SetOption( + option_name=option_name, + value=value, + target=target, + ), + ) + + +class SessionCommands: + """Typed namespace for session command objects.""" + + def __init__(self, batch: CommandBatch) -> None: + """Store the parent batch.""" + self._batch = batch + + def new_window( + self, + *, + target: str | int | None = None, + window_name: str | None = None, + detach: bool = True, + ) -> SessionCmd.NewWindow: + """Add a ``new-window`` command object.""" + return self._batch.add( + SessionCmd.NewWindow( + target=target, + window_name=window_name, + detach=detach, + ), + ) + + +class WindowCommands: + """Typed namespace for window command objects.""" + + def __init__(self, batch: CommandBatch) -> None: + """Store the parent batch.""" + self._batch = batch + + def rename_window( + self, + *, + name: str, + target: str | int | None = None, + ) -> WindowCmd.RenameWindow: + """Add a ``rename-window`` command object.""" + return self._batch.add(WindowCmd.RenameWindow(name=name, target=target)) + + def select_layout( + self, + *, + layout: str, + target: str | int | None = None, + ) -> WindowCmd.SelectLayout: + """Add a ``select-layout`` command object.""" + return self._batch.add(WindowCmd.SelectLayout(layout=layout, target=target)) + + +class PaneCommands: + """Typed namespace for pane command objects.""" + + def __init__(self, batch: CommandBatch) -> None: + """Store the parent batch.""" + self._batch = batch + + def split_window( + self, + *, + target: str | int | None = None, + horizontal: bool = False, + percentage: int | None = None, + ) -> PaneCmd.SplitWindow: + """Add a ``split-window`` command object.""" + return self._batch.add( + PaneCmd.SplitWindow( + target=target, + horizontal=horizontal, + percentage=percentage, + ), + ) + + +def _to_sequence(command: Commandish) -> SharedCommandSequence: + if isinstance(command, SharedCommandSequence): + return command + if isinstance(command, CommandCall): + return SharedCommandSequence((command,)) + return SharedCommandSequence((command.to_call(),)) diff --git a/tests/chainable_commands_experiment/context_api.py b/tests/chainable_commands_experiment/context_api.py new file mode 100644 index 000000000..a970eaea3 --- /dev/null +++ b/tests/chainable_commands_experiment/context_api.py @@ -0,0 +1,65 @@ +"""Context-manager experiment for typed command batches.""" + +from __future__ import annotations + +import types + +from typing_extensions import Self + +from .shared import ( + CommandCall, + CommandSequence, + new_window_call, + split_window_call, +) + + +class CommandBatch: + """Accumulate explicit typed command methods inside a context manager.""" + + def __init__(self) -> None: + """Initialize an empty command batch.""" + self._calls: list[CommandCall] = [] + + def __enter__(self) -> Self: + """Enter the batch context.""" + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc: BaseException | None, + traceback: types.TracebackType | None, + ) -> None: + """Leave the batch context.""" + + def add(self, call: CommandCall) -> CommandCall: + """Append a call and return it for local use.""" + self._calls.append(call) + return call + + def to_sequence(self) -> CommandSequence: + """Return the accumulated calls as an immutable sequence.""" + return CommandSequence(tuple(self._calls)) + + def new_window( + self, + *, + window_name: str | None = None, + detach: bool = True, + ) -> CommandCall: + """Add a ``new-window`` call.""" + return self.add( + new_window_call(window_name=window_name, detach=detach), + ) + + def split_window( + self, + *, + horizontal: bool = False, + percentage: int | None = None, + ) -> CommandCall: + """Add a ``split-window`` call.""" + return self.add( + split_window_call(horizontal=horizontal, percentage=percentage), + ) diff --git a/tests/chainable_commands_experiment/dag_api.py b/tests/chainable_commands_experiment/dag_api.py new file mode 100644 index 000000000..66eec449a --- /dev/null +++ b/tests/chainable_commands_experiment/dag_api.py @@ -0,0 +1,78 @@ +"""Hamilton-style function DAG experiment for command sequences.""" + +from __future__ import annotations + +import collections.abc as cabc +from dataclasses import dataclass + +from .shared import CommandCall, CommandSequence + +StepBuilder = cabc.Callable[[], CommandCall] + + +@dataclass(frozen=True, slots=True) +class CommandStep: + """Named command-producing DAG step.""" + + name: str + builder: StepBuilder + depends_on: tuple[str, ...] = () + + def build(self) -> CommandCall: + """Build this step's command call.""" + return self.builder() + + +def command_step( + name: str, + *, + depends_on: tuple[str, ...] = (), +) -> cabc.Callable[[StepBuilder], CommandStep]: + """Decorate a nullary function as a command DAG step.""" + + def decorator(function: StepBuilder) -> CommandStep: + return CommandStep(name=name, builder=function, depends_on=depends_on) + + return decorator + + +@dataclass(frozen=True, slots=True) +class CommandDag: + """Small command DAG that compiles requested outputs to a sequence.""" + + steps: tuple[CommandStep, ...] + outputs: tuple[str, ...] + + def missing_dependencies(self) -> tuple[str, ...]: + """Return dependency names absent from the DAG.""" + step_names = {step.name for step in self.steps} + missing: list[str] = [] + for step in self.steps: + for dependency in step.depends_on: + if dependency not in step_names and dependency not in missing: + missing.append(dependency) + return tuple(missing) + + def sequence(self) -> CommandSequence: + """Build a command sequence for the requested outputs.""" + missing = self.missing_dependencies() + if missing: + msg = f"missing dependencies: {', '.join(missing)}" + raise ValueError(msg) + + step_by_name = {step.name: step for step in self.steps} + ordered: list[CommandCall] = [] + visited: set[str] = set() + + def visit(name: str) -> None: + if name in visited: + return + step = step_by_name[name] + for dependency in step.depends_on: + visit(dependency) + ordered.append(step.build()) + visited.add(name) + + for output in self.outputs: + visit(output) + return CommandSequence(tuple(ordered)) diff --git a/tests/chainable_commands_experiment/decorator_api.py b/tests/chainable_commands_experiment/decorator_api.py new file mode 100644 index 000000000..d0414cd56 --- /dev/null +++ b/tests/chainable_commands_experiment/decorator_api.py @@ -0,0 +1,63 @@ +"""Django-style decorator experiment for command metadata.""" + +from __future__ import annotations + +import typing as t +from collections.abc import Callable + +from .shared import ( + CommandCall, + CommandScope, + CommandSpec, + new_window_call, + split_window_call, +) + +P = t.ParamSpec("P") + + +def tmux_command( + name: str, + *, + scope: CommandScope, + chainable: bool = True, +) -> Callable[[Callable[P, CommandCall]], Callable[P, CommandCall]]: + """Attach command metadata while preserving the callable signature.""" + spec = CommandSpec(name=name, scope=scope, chainable=chainable) + + def decorator( + function: Callable[P, CommandCall], + ) -> Callable[P, CommandCall]: + vars(function)["__tmux_command_spec__"] = spec + return function + + return decorator + + +def get_command_spec(function: object) -> CommandSpec: + """Return metadata attached by :func:`tmux_command`.""" + spec = getattr(function, "__tmux_command_spec__", None) + if isinstance(spec, CommandSpec): + return spec + msg = "callable does not have tmux command metadata" + raise TypeError(msg) + + +@tmux_command("new-window", scope="session") +def new_window( + *, + window_name: str | None = None, + detach: bool = True, +) -> CommandCall: + """Build a ``new-window`` call.""" + return new_window_call(window_name=window_name, detach=detach) + + +@tmux_command("split-window", scope="window") +def split_window( + *, + horizontal: bool = False, + percentage: int | None = None, +) -> CommandCall: + """Build a ``split-window`` call.""" + return split_window_call(horizontal=horizontal, percentage=percentage) diff --git a/tests/chainable_commands_experiment/deferred_plan_api.py b/tests/chainable_commands_experiment/deferred_plan_api.py new file mode 100644 index 000000000..86a2347f2 --- /dev/null +++ b/tests/chainable_commands_experiment/deferred_plan_api.py @@ -0,0 +1,381 @@ +"""Deferred query-command plan experiment with typed targets.""" + +from __future__ import annotations + +import collections.abc as cabc +import dataclasses +import typing as t +from dataclasses import dataclass + +from .shared import Arg, CommandCall, CommandResultLike, CommandRunner + +OrderField: t.TypeAlias = t.Literal["pane_id", "pane_index", "title"] +ResultT = t.TypeVar("ResultT") +MappedT = t.TypeVar("MappedT") + + +class NoCommandsResolved(RuntimeError): + """Raised when a deferred plan resolves to no concrete commands.""" + + +@dataclass(frozen=True, slots=True) +class PaneTarget: + """Typed tmux pane target.""" + + value: str + + @classmethod + def coerce(cls, target: str | PaneTarget) -> PaneTarget: + """Normalize raw pane target text into a typed target.""" + if isinstance(target, str): + return cls(target) + return target + + def __str__(self) -> str: + """Render as the tmux target value.""" + return self.value + + +@dataclass(frozen=True, slots=True) +class WindowTarget: + """Typed tmux window target.""" + + value: str + + @classmethod + def coerce(cls, target: str | WindowTarget) -> WindowTarget: + """Normalize raw window target text into a typed target.""" + if isinstance(target, str): + return cls(target) + return target + + def __str__(self) -> str: + """Render as the tmux target value.""" + return self.value + + +@dataclass(frozen=True, slots=True) +class SessionTarget: + """Typed tmux session target.""" + + value: str + + @classmethod + def coerce(cls, target: str | SessionTarget) -> SessionTarget: + """Normalize raw session target text into a typed target.""" + if isinstance(target, str): + return cls(target) + return target + + def __str__(self) -> str: + """Render as the tmux target value.""" + return self.value + + +class CommandValue: + """Base for command values created by deferred query plans.""" + + def to_call(self) -> CommandCall: + """Compile the command value into a shared command call.""" + raise NotImplementedError + + def argv(self) -> tuple[str, ...]: + """Render this command as tmux argv tokens.""" + return self.to_call().argv() + + +CommandLike: t.TypeAlias = CommandValue | CommandCall +IntoCommands: t.TypeAlias = CommandLike | cabc.Iterable[CommandLike] + + +@dataclass(frozen=True, slots=True) +class SendKeys(CommandValue): + """Command value for ``send-keys``.""" + + target: PaneTarget + command: str + enter: bool = False + + def to_call(self) -> CommandCall: + """Compile to a shared command call.""" + args: list[Arg] = [self.command] + if self.enter: + args.append("Enter") + return CommandCall("send-keys", tuple(args), target=self.target.value) + + +@dataclass(frozen=True, slots=True) +class ResizePane(CommandValue): + """Command value for ``resize-pane``.""" + + target: PaneTarget + height: int + + def to_call(self) -> CommandCall: + """Compile to a shared command call.""" + return CommandCall("resize-pane", ("-y", self.height), target=self.target.value) + + +@dataclass(frozen=True, slots=True) +class SelectLayout(CommandValue): + """Command value for ``select-layout``.""" + + target: WindowTarget + layout: str + + def to_call(self) -> CommandCall: + """Compile to a shared command call.""" + return CommandCall("select-layout", (self.layout,), target=self.target.value) + + +class BoundPaneCommands: + """Pane command namespace bound to a typed pane target.""" + + def __init__(self, target: PaneTarget) -> None: + """Store the pane target used by every command.""" + self.target = target + + def send_keys(self, command: str, *, enter: bool = False) -> SendKeys: + """Build a target-bound ``send-keys`` command.""" + return SendKeys(target=self.target, command=command, enter=enter) + + def resize_pane(self, *, height: int) -> ResizePane: + """Build a target-bound ``resize-pane`` command.""" + return ResizePane(target=self.target, height=height) + + +class BoundWindowCommands: + """Window command namespace bound to a typed window target.""" + + def __init__(self, target: WindowTarget) -> None: + """Store the window target used by every command.""" + self.target = target + + def select_layout(self, layout: str) -> SelectLayout: + """Build a target-bound ``select-layout`` command.""" + return SelectLayout(target=self.target, layout=layout) + + +@dataclass(frozen=True, slots=True) +class PaneRef: + """Typed pane row returned by the lazy pane query.""" + + pane_id: PaneTarget + window_id: WindowTarget + session_id: SessionTarget + pane_index: int + active: bool + title: str + + @property + def cmd(self) -> BoundPaneCommands: + """Return pane-scoped commands bound to this pane.""" + return BoundPaneCommands(self.pane_id) + + @property + def window(self) -> BoundWindowCommands: + """Return window-scoped commands bound to this pane's window.""" + return BoundWindowCommands(self.window_id) + + +CommandMapper: t.TypeAlias = cabc.Callable[[PaneRef], IntoCommands] + + +@dataclass(frozen=True, slots=True) +class TmuxSnapshot: + """Pure tmux state used to resolve deferred command plans.""" + + panes: tuple[PaneRef, ...] + + +class SnapshotProvider(t.Protocol): + """Object that can provide a pure tmux snapshot.""" + + def snapshot(self) -> TmuxSnapshot: + """Return a tmux snapshot.""" + ... + + +class PlanRunner(CommandRunner, SnapshotProvider, t.Protocol): + """Runner that can resolve queries and dispatch tmux commands.""" + + +SnapshotSource: t.TypeAlias = TmuxSnapshot | SnapshotProvider + + +@dataclass(frozen=True, slots=True) +class PaneQuery: + """Lazy pane query that can become a deferred command plan.""" + + active_filter: bool | None = None + ordering: OrderField | None = None + limit_count: int | None = None + + def filter(self, *, active: bool) -> PaneQuery: + """Return a query filtered by active state.""" + return dataclasses.replace(self, active_filter=active) + + def order_by(self, field: OrderField) -> PaneQuery: + """Return a query ordered by a known pane field.""" + return dataclasses.replace(self, ordering=field) + + def limit(self, count: int) -> PaneQuery: + """Return a query capped to ``count`` rows.""" + return dataclasses.replace(self, limit_count=count) + + def all(self, source: SnapshotSource) -> list[PaneRef]: + """Evaluate the query against a snapshot source.""" + rows = list(_resolve_snapshot(source).panes) + if self.active_filter is not None: + rows = [row for row in rows if row.active is self.active_filter] + if self.ordering is not None: + ordering = self.ordering + rows.sort(key=lambda row: _order_value(row, ordering)) + if self.limit_count is not None: + rows = rows[: self.limit_count] + return rows + + def first(self, source: SnapshotSource) -> PaneRef | None: + """Evaluate the query and return its first row.""" + rows = self.limit(1).all(source) + if not rows: + return None + return rows[0] + + def map( + self, + mapper: cabc.Callable[[PaneRef], MappedT], + ) -> MappedPaneQuery[MappedT]: + """Return a data transformation query.""" + return MappedPaneQuery(query=self, mapper=mapper) + + def each(self, mapper: CommandMapper) -> CommandPlan[None]: + """Return a deferred side-effect command plan.""" + return self.flat_map(mapper) + + def flat_map(self, mapper: CommandMapper) -> CommandPlan[None]: + """Return a deferred multi-command side-effect plan.""" + return CommandPlan(ForEach(query=self, mapper=mapper)) + + +@dataclass(frozen=True, slots=True) +class MappedPaneQuery(t.Generic[MappedT]): + """Data-only query transformation over pane refs.""" + + query: PaneQuery + mapper: cabc.Callable[[PaneRef], MappedT] + + def all(self, source: SnapshotSource) -> list[MappedT]: + """Evaluate the query and transform every row.""" + return [self.mapper(row) for row in self.query.all(source)] + + def first(self, source: SnapshotSource) -> MappedT | None: + """Evaluate the query and transform the first row.""" + row = self.query.first(source) + if row is None: + return None + return self.mapper(row) + + +@dataclass(frozen=True, slots=True) +class ForEach: + """Deferred query plus command mapper plan node.""" + + query: PaneQuery + mapper: CommandMapper + + +@dataclass(frozen=True, slots=True) +class CommandSequence: + """Resolved non-empty sequence of command calls.""" + + calls: tuple[CommandCall, ...] + + def __post_init__(self) -> None: + """Reject empty resolved command sequences.""" + if not self.calls: + msg = "command plan resolved to no commands" + raise NoCommandsResolved(msg) + + def argvs(self) -> tuple[tuple[str, ...], ...]: + """Render each command independently.""" + return tuple(call.argv() for call in self.calls) + + def argv(self) -> tuple[str, ...]: + """Render one native tmux semicolon command sequence.""" + rendered: list[str] = [] + for index, call in enumerate(self.calls): + if index: + rendered.append(";") + rendered.extend(call.argv()) + return tuple(rendered) + + def run(self, runner: CommandRunner) -> CommandResultLike: + """Dispatch the sequence through one runner call.""" + argv = self.argv() + return runner.cmd(argv[0], *argv[1:]) + + +@dataclass(frozen=True, slots=True) +class CommandPlan(t.Generic[ResultT]): + """Lazy command plan that resolves a query into commands.""" + + node: ForEach + + def to_sequence(self, source: SnapshotSource) -> CommandSequence: + """Resolve the query and compile mapped commands.""" + calls: list[CommandCall] = [] + for row in self.node.query.all(source): + calls.extend(_to_calls(self.node.mapper(row))) + if not calls: + msg = "command plan resolved to no commands" + raise NoCommandsResolved(msg) + return CommandSequence(tuple(calls)) + + def run(self: CommandPlan[None], runner: PlanRunner) -> None: + """Resolve, compile, and execute the plan through one tmux dispatch.""" + try: + sequence = self.to_sequence(runner) + except NoCommandsResolved: + return None + sequence.run(runner) + return None + + +def panes() -> PaneQuery: + """Start a lazy pane query.""" + return PaneQuery() + + +def _resolve_snapshot(source: SnapshotSource) -> TmuxSnapshot: + if isinstance(source, TmuxSnapshot): + return source + return source.snapshot() + + +def _order_value(row: PaneRef, field: OrderField) -> str | int: + if field == "pane_id": + return row.pane_id.value + if field == "pane_index": + return row.pane_index + return row.title + + +def _to_calls(value: IntoCommands) -> tuple[CommandCall, ...]: + if isinstance(value, CommandCall): + return (value,) + if isinstance(value, CommandValue): + return (value.to_call(),) + if isinstance(value, str | bytes): + msg = "command mapper must return a command or iterable of commands" + raise TypeError(msg) + + calls: list[CommandCall] = [] + try: + iterator = iter(value) + except TypeError as exc: + msg = "command mapper must return a command or iterable of commands" + raise TypeError(msg) from exc + for item in iterator: + calls.extend(_to_calls(item)) + return tuple(calls) diff --git a/tests/chainable_commands_experiment/descriptor_api.py b/tests/chainable_commands_experiment/descriptor_api.py new file mode 100644 index 000000000..cd9d29c62 --- /dev/null +++ b/tests/chainable_commands_experiment/descriptor_api.py @@ -0,0 +1,105 @@ +"""Typed descriptor experiment for command metadata.""" + +from __future__ import annotations + +import typing as t + +from .shared import ( + CommandCall, + CommandSpec, + new_window_call, + split_window_call, +) + + +class NewWindowCommand: + """Bound command object for ``new-window``.""" + + spec = CommandSpec(name="new-window", scope="session") + + def __call__( + self, + *, + window_name: str | None = None, + detach: bool = True, + ) -> CommandCall: + """Build a ``new-window`` call.""" + return new_window_call(window_name=window_name, detach=detach) + + +class SplitWindowCommand: + """Bound command object for ``split-window``.""" + + spec = CommandSpec(name="split-window", scope="window") + + def __call__( + self, + *, + horizontal: bool = False, + percentage: int | None = None, + ) -> CommandCall: + """Build a ``split-window`` call.""" + return split_window_call(horizontal=horizontal, percentage=percentage) + + +class NewWindowDescriptor: + """Descriptor that binds ``new-window`` command objects.""" + + @t.overload + def __get__( + self, + instance: None, + owner: type[object] | None = None, + ) -> NewWindowDescriptor: ... + + @t.overload + def __get__( + self, + instance: object, + owner: type[object] | None = None, + ) -> NewWindowCommand: ... + + def __get__( + self, + instance: object | None, + owner: type[object] | None = None, + ) -> NewWindowDescriptor | NewWindowCommand: + """Bind the descriptor to an instance.""" + if instance is None: + return self + return NewWindowCommand() + + +class SplitWindowDescriptor: + """Descriptor that binds ``split-window`` command objects.""" + + @t.overload + def __get__( + self, + instance: None, + owner: type[object] | None = None, + ) -> SplitWindowDescriptor: ... + + @t.overload + def __get__( + self, + instance: object, + owner: type[object] | None = None, + ) -> SplitWindowCommand: ... + + def __get__( + self, + instance: object | None, + owner: type[object] | None = None, + ) -> SplitWindowDescriptor | SplitWindowCommand: + """Bind the descriptor to an instance.""" + if instance is None: + return self + return SplitWindowCommand() + + +class Commands: + """Container exposing typed command descriptors.""" + + new_window = NewWindowDescriptor() + split_window = SplitWindowDescriptor() diff --git a/tests/chainable_commands_experiment/document_query_api.py b/tests/chainable_commands_experiment/document_query_api.py new file mode 100644 index 000000000..44e145afb --- /dev/null +++ b/tests/chainable_commands_experiment/document_query_api.py @@ -0,0 +1,69 @@ +"""Beanie-style document query experiment for command metadata.""" + +from __future__ import annotations + +import collections.abc as cabc +from dataclasses import dataclass + +from .shared import CommandScope + + +@dataclass(frozen=True, slots=True) +class CommandDocument: + """Document-shaped command metadata.""" + + name: str + scope: CommandScope + chainable: bool + + +class CommandDocumentQuery: + """Immutable query over command metadata documents.""" + + def __init__( + self, + documents: cabc.Iterable[CommandDocument], + *, + scope: CommandScope | None = None, + chainable: bool | None = None, + name: str | None = None, + ) -> None: + """Store the document set and filters.""" + self._documents = tuple(documents) + self._scope = scope + self._chainable = chainable + self._name = name + + def where( + self, + *, + scope: CommandScope | None = None, + chainable: bool | None = None, + ) -> CommandDocumentQuery: + """Return a query filtered by document fields.""" + return CommandDocumentQuery( + self._documents, + scope=self._scope if scope is None else scope, + chainable=self._chainable if chainable is None else chainable, + name=self._name, + ) + + def where_name(self, name: str) -> CommandDocumentQuery: + """Return a query filtered by command name.""" + return CommandDocumentQuery( + self._documents, + scope=self._scope, + chainable=self._chainable, + name=name, + ) + + def all(self) -> list[CommandDocument]: + """Return all matching command documents.""" + return [document for document in self._documents if self._matches(document)] + + def _matches(self, document: CommandDocument) -> bool: + if self._scope is not None and document.scope != self._scope: + return False + if self._chainable is not None and document.chainable is not self._chainable: + return False + return self._name is None or document.name == self._name diff --git a/tests/chainable_commands_experiment/expression_api.py b/tests/chainable_commands_experiment/expression_api.py new file mode 100644 index 000000000..49c434b24 --- /dev/null +++ b/tests/chainable_commands_experiment/expression_api.py @@ -0,0 +1,115 @@ +"""Ibis-style typed expression experiment.""" + +from __future__ import annotations + +import typing as t +from dataclasses import dataclass + +Scalar: t.TypeAlias = str | int | bool +Row: t.TypeAlias = dict[str, object] +FieldT = t.TypeVar("FieldT", bound=Scalar) + + +@dataclass(frozen=True, slots=True) +class Predicate: + """Compiled comparison against one field.""" + + field_name: str + value: Scalar + + def compile(self) -> str: + """Compile the predicate to a displayable expression.""" + return f"{self.field_name}={self.value}" + + +@dataclass(frozen=True, slots=True) +class Field(t.Generic[FieldT]): + """Typed field expression.""" + + name: str + + def eq(self, value: FieldT) -> Predicate: + """Return an equality predicate for this field.""" + return Predicate(self.name, value) + + +SelectableField: t.TypeAlias = Field[str] | Field[int] | Field[bool] + + +@dataclass(frozen=True, slots=True) +class CompiledExpression: + """Compiled table expression.""" + + fields: tuple[str, ...] + predicates: tuple[str, ...] + + +@dataclass(frozen=True, slots=True) +class TableExpression: + """Immutable table expression.""" + + predicates: tuple[Predicate, ...] = () + fields: tuple[SelectableField, ...] = () + + def where(self, predicate: Predicate) -> TableExpression: + """Return an expression with another predicate.""" + return TableExpression((*self.predicates, predicate), self.fields) + + def select(self, *fields: SelectableField) -> TableExpression: + """Return an expression with selected fields.""" + return TableExpression(self.predicates, fields) + + def compile(self) -> CompiledExpression: + """Compile fields and predicates without executing them.""" + return CompiledExpression( + fields=tuple(field.name for field in self.fields), + predicates=tuple(predicate.compile() for predicate in self.predicates), + ) + + def execute(self, runner: ExpressionRunner) -> list[Row]: + """Execute this expression against a runner.""" + return runner.execute(self) + + +class PaneTable: + """Typed pane table expression root.""" + + id = Field[str]("pane_id") + title = Field[str]("pane_title") + active = Field[bool]("pane_active") + + def where(self, predicate: Predicate) -> TableExpression: + """Start a pane expression with a predicate.""" + return TableExpression().where(predicate) + + def select(self, *fields: SelectableField) -> TableExpression: + """Start a pane expression with selected fields.""" + return TableExpression().select(*fields) + + +@dataclass(frozen=True, slots=True) +class ExpressionRunner: + """Backend runner for expression materialization.""" + + rows: tuple[Row, ...] + + def execute(self, expression: TableExpression) -> list[Row]: + """Materialize rows matching an expression.""" + selected: list[Row] = [] + for row in self.rows: + if not _matches(row, expression.predicates): + continue + selected.append( + { + field.name: row[field.name] + for field in expression.fields + if field.name in row + }, + ) + return selected + + +def _matches(row: Row, predicates: tuple[Predicate, ...]) -> bool: + return all( + row.get(predicate.field_name) == predicate.value for predicate in predicates + ) diff --git a/tests/chainable_commands_experiment/generated_client_api.py b/tests/chainable_commands_experiment/generated_client_api.py new file mode 100644 index 000000000..5c284d1fd --- /dev/null +++ b/tests/chainable_commands_experiment/generated_client_api.py @@ -0,0 +1,30 @@ +"""Prisma-style generated command client experiment.""" + +from __future__ import annotations + +from .shared import CommandCall, new_window_call + + +class GeneratedSessionCommands: + """Generated session command namespace.""" + + def new_window(self, *, name: str, detach: bool = True) -> CommandCall: + """Build a typed ``new-window`` command.""" + return new_window_call(window_name=name, detach=detach) + + +class GeneratedWindowCommands: + """Generated window command namespace.""" + + def rename(self, *, target: str, name: str) -> CommandCall: + """Build a typed ``rename-window`` command.""" + return CommandCall("rename-window", (name,), target=target) + + +class GeneratedCommands: + """Generated root command client.""" + + def __init__(self) -> None: + """Initialize generated namespaces.""" + self.session = GeneratedSessionCommands() + self.window = GeneratedWindowCommands() diff --git a/tests/chainable_commands_experiment/lazy_plan_api.py b/tests/chainable_commands_experiment/lazy_plan_api.py new file mode 100644 index 000000000..a1e7385fa --- /dev/null +++ b/tests/chainable_commands_experiment/lazy_plan_api.py @@ -0,0 +1,82 @@ +"""Polars-style lazy command plan experiment.""" + +from __future__ import annotations + +import dataclasses +import typing as t +from dataclasses import dataclass + +from .shared import CommandCall, CommandScope + +SelectedField: t.TypeAlias = t.Literal["name", "target", "scope"] + + +@dataclass(frozen=True, slots=True) +class PlannedCall: + """Command call with enough metadata for lazy filtering.""" + + scope: CommandScope + call: CommandCall + + +@dataclass(frozen=True, slots=True) +class CommandRow: + """Collected command-row projection.""" + + name: str + target: str | int | None = None + scope: CommandScope | None = None + + +@dataclass(frozen=True, slots=True) +class LazyCommandPlan: + """Immutable lazy command-plan graph.""" + + planned_calls: tuple[PlannedCall, ...] + scope_filter: CommandScope | None = None + selected_fields: tuple[SelectedField, ...] = () + + @classmethod + def from_calls(cls, calls: tuple[PlannedCall, ...]) -> LazyCommandPlan: + """Create a lazy plan from planned calls.""" + return cls(calls) + + def filter_scope(self, scope: CommandScope) -> LazyCommandPlan: + """Return a plan filtered by command scope.""" + return dataclasses.replace(self, scope_filter=scope) + + def select(self, *fields: SelectedField) -> LazyCommandPlan: + """Return a plan with selected output fields.""" + return dataclasses.replace(self, selected_fields=fields) + + def optimize(self) -> LazyCommandPlan: + """Return the optimized plan boundary.""" + return self + + def explain(self) -> tuple[str, ...]: + """Render a tiny explanation of the lazy operations.""" + steps: list[str] = [] + if self.scope_filter is not None: + steps.append(f"filter_scope={self.scope_filter}") + if self.selected_fields: + steps.append(f"select={','.join(self.selected_fields)}") + return tuple(steps) + + def collect(self) -> list[CommandRow]: + """Materialize the lazy plan into command rows.""" + rows: list[CommandRow] = [] + for planned_call in self.planned_calls: + if ( + self.scope_filter is not None + and planned_call.scope != self.scope_filter + ): + continue + scope = planned_call.scope if "scope" in self.selected_fields else None + rows.append( + CommandRow( + name=planned_call.call.name, + target=planned_call.call.target, + scope=scope, + ), + ) + return rows diff --git a/tests/chainable_commands_experiment/orchestration_api.py b/tests/chainable_commands_experiment/orchestration_api.py new file mode 100644 index 000000000..1063da991 --- /dev/null +++ b/tests/chainable_commands_experiment/orchestration_api.py @@ -0,0 +1,44 @@ +"""Prefect-style orchestration experiment for deferred command tasks.""" + +from __future__ import annotations + +import collections.abc as cabc +from dataclasses import dataclass + +from .shared import CommandCall, CommandSequence + +RenameArguments = tuple[str, str] +RenameTask = cabc.Callable[[str, str], CommandCall] + + +@dataclass(frozen=True, slots=True) +class SubmittedCommand: + """Deferred command submission.""" + + call: CommandCall + + +class CommandTask: + """Typed command task with submit and map-style helpers.""" + + def __init__(self, function: RenameTask) -> None: + """Store a typed command factory.""" + self._function = function + + def submit(self, target: str, name: str) -> SubmittedCommand: + """Submit one command for later sequencing.""" + return SubmittedCommand(self._function(target, name)) + + def map(self, arguments: cabc.Iterable[RenameArguments]) -> list[SubmittedCommand]: + """Submit many commands from an iterable of argument tuples.""" + return [self.submit(target, name) for target, name in arguments] + + +def rename_window(target: str, name: str) -> CommandCall: + """Build a target-aware ``rename-window`` command.""" + return CommandCall("rename-window", (name,), target=target) + + +def submitted_sequence(submitted: cabc.Sequence[SubmittedCommand]) -> CommandSequence: + """Collapse submitted commands to a native tmux sequence.""" + return CommandSequence(tuple(item.call for item in submitted)) diff --git a/tests/chainable_commands_experiment/queryset_api.py b/tests/chainable_commands_experiment/queryset_api.py new file mode 100644 index 000000000..768e67022 --- /dev/null +++ b/tests/chainable_commands_experiment/queryset_api.py @@ -0,0 +1,78 @@ +"""Django QuerySet-style lazy query experiment.""" + +from __future__ import annotations + +import dataclasses +import typing as t +from dataclasses import dataclass + +OrderField: t.TypeAlias = t.Literal["pane_id", "pane_index", "title"] + + +@dataclass(frozen=True, slots=True) +class PaneRow: + """Small pane row returned by the query demo.""" + + pane_id: str + pane_index: int + active: bool + title: str + + +@dataclass(frozen=True, slots=True) +class StaticPaneRunner: + """Runner exposing fixed pane rows.""" + + rows: tuple[PaneRow, ...] + + def list_panes(self) -> list[PaneRow]: + """Return pane rows for query evaluation.""" + return list(self.rows) + + +@dataclass(frozen=True, slots=True) +class PaneQuery: + """Lazy, immutable pane query.""" + + active_filter: bool | None = None + ordering: OrderField | None = None + limit_count: int | None = None + + def filter(self, *, active: bool) -> PaneQuery: + """Return a query filtered by active state.""" + return dataclasses.replace(self, active_filter=active) + + def order_by(self, field: OrderField) -> PaneQuery: + """Return a query ordered by a known pane field.""" + return dataclasses.replace(self, ordering=field) + + def limit(self, count: int) -> PaneQuery: + """Return a query capped to ``count`` rows.""" + return dataclasses.replace(self, limit_count=count) + + def all(self, runner: StaticPaneRunner) -> list[PaneRow]: + """Evaluate the query and return all matching rows.""" + rows = runner.list_panes() + if self.active_filter is not None: + rows = [row for row in rows if row.active is self.active_filter] + if self.ordering is not None: + ordering = self.ordering + rows.sort(key=lambda row: _order_value(row, ordering)) + if self.limit_count is not None: + rows = rows[: self.limit_count] + return rows + + def first(self, runner: StaticPaneRunner) -> PaneRow | None: + """Evaluate the query and return its first row.""" + rows = self.limit(1).all(runner) + if not rows: + return None + return rows[0] + + +def _order_value(row: PaneRow, field: OrderField) -> str | int: + if field == "pane_id": + return row.pane_id + if field == "pane_index": + return row.pane_index + return row.title diff --git a/tests/chainable_commands_experiment/runnable_api.py b/tests/chainable_commands_experiment/runnable_api.py new file mode 100644 index 000000000..8234ae118 --- /dev/null +++ b/tests/chainable_commands_experiment/runnable_api.py @@ -0,0 +1,111 @@ +"""LangChain-style runnable experiment for command composition.""" + +from __future__ import annotations + +import collections.abc as cabc +import typing as t +from dataclasses import dataclass, field + +from .shared import Arg, CommandCall, CommandRunner + +InputT = t.TypeVar("InputT") +OutputT = t.TypeVar("OutputT") +NextT = t.TypeVar("NextT") + + +@dataclass(slots=True) +class RunRecord: + """Materialized result from a runnable command dispatch.""" + + command: str + args: tuple[Arg, ...] + target: str | int | None + stdout: list[str] = field(default_factory=list) + stderr: list[str] = field(default_factory=list) + returncode: int = 0 + + +class RunnableCommand(t.Generic[InputT, OutputT]): + """Small invokable, batchable, composable command unit.""" + + def __init__( + self, + invoke: cabc.Callable[[InputT, CommandRunner], OutputT], + ) -> None: + """Store the typed invocation function.""" + self._invoke = invoke + + def invoke(self, input_value: InputT, runner: CommandRunner) -> OutputT: + """Run the command unit for one input.""" + return self._invoke(input_value, runner) + + def batch( + self, + input_values: cabc.Iterable[InputT], + runner: CommandRunner, + ) -> list[OutputT]: + """Run the command unit for many inputs.""" + return [self.invoke(input_value, runner) for input_value in input_values] + + def stream( + self, + input_value: InputT, + runner: CommandRunner, + ) -> cabc.Iterator[OutputT]: + """Yield the single result through a streaming-shaped API.""" + yield self.invoke(input_value, runner) + + def then( + self, + next_command: RunnableCommand[OutputT, NextT], + ) -> RunnableCommand[InputT, NextT]: + """Compose this runnable with a following runnable.""" + + def invoke(input_value: InputT, runner: CommandRunner) -> NextT: + return next_command.invoke(self.invoke(input_value, runner), runner) + + return RunnableCommand(invoke) + + def __rshift__( + self, + next_command: RunnableCommand[OutputT, NextT], + ) -> RunnableCommand[InputT, NextT]: + """Compose runnable commands with ``>>``.""" + return self.then(next_command) + + +def target_capture_call() -> RunnableCommand[str, CommandCall]: + """Build a runnable that maps a target pane to a capture command.""" + + def invoke(target: str, runner: CommandRunner) -> CommandCall: + del runner + return CommandCall("capture-pane", ("-p",), target=target) + + return RunnableCommand(invoke) + + +def run_command() -> RunnableCommand[CommandCall, RunRecord]: + """Build a runnable that dispatches a command call.""" + + def invoke(call: CommandCall, runner: CommandRunner) -> RunRecord: + result = runner.cmd(call.name, *call.args, target=call.target) + return RunRecord( + command=call.name, + args=call.args, + target=call.target, + stdout=result.stdout, + stderr=result.stderr, + returncode=result.returncode, + ) + + return RunnableCommand(invoke) + + +def render_argv() -> RunnableCommand[CommandCall, tuple[str, ...]]: + """Build a runnable that renders a command call without dispatching it.""" + + def invoke(call: CommandCall, runner: CommandRunner) -> tuple[str, ...]: + del runner + return call.argv() + + return RunnableCommand(invoke) diff --git a/tests/chainable_commands_experiment/selection_api.py b/tests/chainable_commands_experiment/selection_api.py new file mode 100644 index 000000000..40709e348 --- /dev/null +++ b/tests/chainable_commands_experiment/selection_api.py @@ -0,0 +1,75 @@ +"""GraphQL-style nested selection experiment.""" + +from __future__ import annotations + +import dataclasses +from dataclasses import dataclass + +ScopeName = str +FieldName = str +SelectionPayload = dict[str, tuple[str, ...]] + + +@dataclass(frozen=True, slots=True) +class SelectionPlan: + """Compiled nested selection plan.""" + + scopes: tuple[ScopeName, ...] + fields: tuple[FieldName, ...] + + +@dataclass(frozen=True, slots=True) +class SelectionQuery: + """Immutable nested tmux selection query.""" + + scopes: tuple[ScopeName, ...] + selected_fields: tuple[FieldName, ...] = () + + def sessions(self) -> SelectionQuery: + """Select session depth.""" + return self._include("session") + + def windows(self) -> SelectionQuery: + """Select window depth.""" + return self._include("window") + + def panes(self) -> SelectionQuery: + """Select pane depth.""" + return self._include("pane") + + def fields(self, *field_names: FieldName) -> SelectionQuery: + """Select fields at the current depth.""" + return dataclasses.replace(self, selected_fields=field_names) + + def compile(self) -> SelectionPlan: + """Compile the nested selection.""" + return SelectionPlan(self.scopes, self.selected_fields) + + def run(self, runner: StaticSelectionRunner) -> SelectionPayload: + """Execute the nested selection through a runner.""" + return runner.run(self.compile()) + + def _include(self, scope: ScopeName) -> SelectionQuery: + return dataclasses.replace(self, scopes=(*self.scopes, scope)) + + +class TmuxSelection: + """Selection root namespace.""" + + @staticmethod + def server() -> SelectionQuery: + """Start at server depth.""" + return SelectionQuery(("server",)) + + +@dataclass(frozen=True, slots=True) +class StaticSelectionRunner: + """Runner returning fixed field payloads.""" + + values: SelectionPayload + + def run(self, plan: SelectionPlan) -> SelectionPayload: + """Return values requested by a compiled plan.""" + return { + field: self.values[field] for field in plan.fields if field in self.values + } diff --git a/tests/chainable_commands_experiment/shared.py b/tests/chainable_commands_experiment/shared.py new file mode 100644 index 000000000..c2b5f1569 --- /dev/null +++ b/tests/chainable_commands_experiment/shared.py @@ -0,0 +1,153 @@ +"""Shared typed command-sequence primitives for the experiments.""" + +from __future__ import annotations + +import typing as t +from dataclasses import dataclass + +Arg: t.TypeAlias = str | int +CommandScope: t.TypeAlias = t.Literal["server", "session", "window", "pane"] + + +class CommandResultLike(t.Protocol): + """Small result protocol matching the libtmux command result surface.""" + + stdout: list[str] + stderr: list[str] + returncode: int + + +class CommandRunner(t.Protocol): + """Object capable of dispatching one tmux command argv.""" + + def cmd( + self, + cmd: str, + *args: Arg, + target: str | int | None = None, + ) -> CommandResultLike: + """Dispatch a tmux command.""" + ... + + +@dataclass(frozen=True, slots=True) +class CommandSpec: + """Static metadata for a tmux command factory.""" + + name: str + scope: CommandScope + chainable: bool = True + + +@dataclass(frozen=True, slots=True) +class CommandCall: + """One typed tmux command call before subprocess dispatch.""" + + name: str + args: tuple[Arg, ...] = () + target: str | int | None = None + + def argv(self) -> tuple[str, ...]: + """Render this call as tmux argv tokens.""" + rendered: list[str] = [self.name] + if self.target is not None: + rendered.extend(("-t", str(self.target))) + rendered.extend(_render_arg(arg) for arg in self.args) + return tuple(rendered) + + def then(self, other: CommandCall | CommandSequence) -> CommandSequence: + """Return a sequence with another call or sequence appended.""" + if isinstance(other, CommandCall): + return CommandSequence((self, other)) + return CommandSequence((self, *other.calls)) + + def __rshift__(self, other: CommandCall | CommandSequence) -> CommandSequence: + """Compose command calls with ``>>``.""" + return self.then(other) + + +@dataclass(frozen=True, slots=True) +class CommandSequence: + """Ordered native tmux command sequence.""" + + calls: tuple[CommandCall, ...] + + def __post_init__(self) -> None: + """Reject empty sequences.""" + if not self.calls: + msg = "CommandSequence requires at least one call" + raise ValueError(msg) + + def argv(self) -> tuple[str, ...]: + """Render the full sequence with tmux semicolon separators.""" + rendered: list[str] = [] + for index, call in enumerate(self.calls): + if index: + rendered.append(";") + rendered.extend(call.argv()) + return tuple(rendered) + + def then(self, other: CommandCall | CommandSequence) -> CommandSequence: + """Return a sequence with another call or sequence appended.""" + if isinstance(other, CommandCall): + return CommandSequence((*self.calls, other)) + return CommandSequence((*self.calls, *other.calls)) + + def __rshift__(self, other: CommandCall | CommandSequence) -> CommandSequence: + """Compose command sequences with ``>>``.""" + return self.then(other) + + def run(self, runner: CommandRunner) -> CommandResultLike: + """Dispatch the sequence through one runner call.""" + argv = self.argv() + return runner.cmd(argv[0], *argv[1:]) + + +def new_window_call( + window_name: str | None = None, + *, + detach: bool = True, +) -> CommandCall: + """Build a ``new-window`` call.""" + args: list[Arg] = [] + if detach: + args.append("-d") + if window_name is not None: + args.extend(("-n", window_name)) + return CommandCall("new-window", tuple(args)) + + +def split_window_call( + *, + horizontal: bool = False, + percentage: int | None = None, +) -> CommandCall: + """Build a ``split-window`` call.""" + args: list[Arg] = [] + if horizontal: + args.append("-h") + if percentage is not None: + args.extend(("-p", percentage)) + return CommandCall("split-window", tuple(args)) + + +def rename_window_call(new_name: str) -> CommandCall: + """Build a ``rename-window`` call.""" + return CommandCall("rename-window", (new_name,)) + + +def select_layout_call(layout: str) -> CommandCall: + """Build a ``select-layout`` call.""" + return CommandCall("select-layout", (layout,)) + + +def show_option_call(option_name: str) -> CommandCall: + """Build a ``show-option`` call.""" + return CommandCall("show-option", ("-gqv", option_name)) + + +def _render_arg(arg: Arg) -> str: + text = str(arg) + if text.endswith(";"): + return f"{text[:-1]}\\;" + return text diff --git a/tests/chainable_commands_experiment/statement_api.py b/tests/chainable_commands_experiment/statement_api.py new file mode 100644 index 000000000..b4ebc74f1 --- /dev/null +++ b/tests/chainable_commands_experiment/statement_api.py @@ -0,0 +1,54 @@ +"""SQLAlchemy-style generative command statement experiment.""" + +from __future__ import annotations + +import dataclasses +from dataclasses import dataclass, field + +from .shared import Arg, CommandCall + + +@dataclass(frozen=True, slots=True) +class CommandStatement: + """Immutable command statement executed later by a runner.""" + + name: str + args: tuple[Arg, ...] = () + target_value: str | int | None = None + + def target(self, target: str | int) -> CommandStatement: + """Return a statement with a tmux target.""" + return dataclasses.replace(self, target_value=target) + + def flag(self, name: str, value: Arg | None = None) -> CommandStatement: + """Return a statement with a command flag.""" + args = (*self.args, name) if value is None else (*self.args, name, value) + return dataclasses.replace(self, args=args) + + def arg(self, value: Arg) -> CommandStatement: + """Return a statement with one positional argument.""" + return dataclasses.replace(self, args=(*self.args, value)) + + def to_call(self) -> CommandCall: + """Compile the statement to the shared command-call IR.""" + return CommandCall(self.name, self.args, target=self.target_value) + + +@dataclass(frozen=True, slots=True) +class StatementResult: + """Recorded statement execution result.""" + + argv: tuple[str, ...] + + +@dataclass +class StatementRunner: + """Runner that records executed statements.""" + + executed: list[tuple[str, ...]] = field(default_factory=list) + + def execute(self, statement: CommandStatement) -> StatementResult: + """Execute a statement at the explicit boundary.""" + argv = statement.to_call().argv() + self.executed.append(argv) + return StatementResult(argv) diff --git a/tests/chainable_commands_experiment/test_ast_api.py b/tests/chainable_commands_experiment/test_ast_api.py new file mode 100644 index 000000000..ef3322a86 --- /dev/null +++ b/tests/chainable_commands_experiment/test_ast_api.py @@ -0,0 +1,62 @@ +"""Tests for an AST-assisted command API experiment.""" + +from __future__ import annotations + +import pytest +from typing_extensions import assert_type + +from . import ast_api as api +from .shared import CommandCall + + +def _script( + proxy: api.AstCommandProxy, +) -> tuple[CommandCall, CommandCall]: + """Return a statically visible tuple of proxy command calls.""" + return ( + proxy.new_window(window_name="work"), + proxy.split_window(horizontal=True, percentage=50), + ) + + +def _unsupported_loop(proxy: api.AstCommandProxy) -> CommandCall: + """Use control flow that the AST prototype deliberately rejects.""" + for _ in range(1): + return proxy.new_window(window_name="work") + msg = "unreachable" + raise AssertionError(msg) + + +def test_ast_api_proxy_methods_remain_typed() -> None: + """The proxy itself can still provide normal completion.""" + proxy = api.AstCommandProxy() + + assert_type(proxy.new_window(window_name="work"), CommandCall) + + +def test_ast_api_extracts_supported_call_names() -> None: + """The AST helper discovers the simple command-call shape.""" + assert api.command_names_from_ast(_script) == ("new_window", "split_window") + + +def test_ast_api_builds_chain_by_executing_typed_proxy() -> None: + """The AST API validates shape, then accumulates calls via a typed proxy.""" + chain = api.from_callable(_script) + + assert chain.argv() == ( + "new-window", + "-d", + "-n", + "work", + ";", + "split-window", + "-h", + "-p", + "50", + ) + + +def test_ast_api_rejects_control_flow() -> None: + """The prototype is intentionally conservative about AST shapes.""" + with pytest.raises(api.UnsupportedAstShape): + api.command_names_from_ast(_unsupported_loop) diff --git a/tests/chainable_commands_experiment/test_async_deferred_plan_api.py b/tests/chainable_commands_experiment/test_async_deferred_plan_api.py new file mode 100644 index 000000000..9d0c2eaa1 --- /dev/null +++ b/tests/chainable_commands_experiment/test_async_deferred_plan_api.py @@ -0,0 +1,184 @@ +"""Tests for an asyncio facade over deferred command plans.""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field + +import pytest +from typing_extensions import assert_type + +from . import async_deferred_plan_api as api, deferred_plan_api as sync_api +from .shared import Arg + + +@dataclass +class _FakeResult: + """Small command result for async deferred-plan runner tests.""" + + stdout: list[str] = field(default_factory=list) + stderr: list[str] = field(default_factory=list) + returncode: int = 0 + + +@dataclass +class _AsyncFakeRunner: + """Async runner that exposes a snapshot and records tmux dispatches.""" + + snapshot_value: sync_api.TmuxSnapshot + calls: list[tuple[str, tuple[Arg, ...], str | int | None]] = field( + default_factory=list, + ) + snapshot_calls: int = 0 + + async def snapshot(self) -> sync_api.TmuxSnapshot: + """Return the fixed tmux snapshot asynchronously.""" + await asyncio.sleep(0) + self.snapshot_calls += 1 + return self.snapshot_value + + async def cmd( + self, + cmd: str, + *args: Arg, + target: str | int | None = None, + ) -> _FakeResult: + """Record one async command dispatch.""" + await asyncio.sleep(0) + self.calls.append((cmd, args, target)) + return _FakeResult(stdout=["async ok"]) + + +def _snapshot() -> sync_api.TmuxSnapshot: + return sync_api.TmuxSnapshot( + panes=( + sync_api.PaneRef( + pane_id=sync_api.PaneTarget("%2"), + window_id=sync_api.WindowTarget("@1"), + session_id=sync_api.SessionTarget("$0"), + pane_index=2, + active=True, + title="shell", + ), + sync_api.PaneRef( + pane_id=sync_api.PaneTarget("%1"), + window_id=sync_api.WindowTarget("@1"), + session_id=sync_api.SessionTarget("$0"), + pane_index=1, + active=True, + title="editor", + ), + sync_api.PaneRef( + pane_id=sync_api.PaneTarget("%3"), + window_id=sync_api.WindowTarget("@2"), + session_id=sync_api.SessionTarget("$0"), + pane_index=3, + active=False, + title="logs", + ), + ), + ) + + +def test_async_to_sequence_awaits_snapshot_without_dispatching() -> None: + """Async plan inspection preserves the pure command assertion workflow.""" + + async def scenario() -> None: + runner = _AsyncFakeRunner(_snapshot()) + plan = ( + api.panes() + .filter(active=True) + .order_by("pane_index") + .each( + lambda pane: [ + pane.cmd.send_keys("clear", enter=True), + pane.window.select_layout("even-horizontal"), + ], + ) + ) + + sequence = await plan.to_sequence(runner) + + assert_type(plan, api.CommandPlan[None]) + assert_type(sequence, api.CommandSequence) + assert sequence.argvs() == ( + ("send-keys", "-t", "%1", "clear", "Enter"), + ("select-layout", "-t", "@1", "even-horizontal"), + ("send-keys", "-t", "%2", "clear", "Enter"), + ("select-layout", "-t", "@1", "even-horizontal"), + ) + assert runner.snapshot_calls == 1 + assert runner.calls == [] + + asyncio.run(scenario()) + + +def test_async_run_dispatches_one_native_tmux_sequence() -> None: + """Async execution still batches concrete commands into one tmux call.""" + + async def scenario() -> None: + runner = _AsyncFakeRunner(_snapshot()) + plan = ( + api.panes() + .filter(active=True) + .order_by("pane_index") + .each(lambda pane: pane.cmd.resize_pane(height=20)) + ) + + await plan.run(runner) + + assert runner.calls == [ + ( + "resize-pane", + ( + "-t", + "%1", + "-y", + "20", + ";", + "resize-pane", + "-t", + "%2", + "-y", + "20", + ), + None, + ), + ] + + asyncio.run(scenario()) + + +def test_async_map_and_first_are_data_only() -> None: + """Async row transforms stay separate from command construction.""" + + async def scenario() -> None: + runner = _AsyncFakeRunner(_snapshot()) + query = api.panes().filter(active=True).order_by("pane_index") + + titles = await query.map(lambda pane: pane.title).all(runner) + first = await query.first(runner) + + assert_type(titles, list[str]) + assert_type(first, sync_api.PaneRef | None) + assert titles == ["editor", "shell"] + assert first == _snapshot().panes[1] + assert runner.calls == [] + + asyncio.run(scenario()) + + +def test_async_empty_plan_to_sequence_raises_but_run_is_noop() -> None: + """Async empty plans match the sync no-op execution behavior.""" + + async def scenario() -> None: + runner = _AsyncFakeRunner(sync_api.TmuxSnapshot(panes=())) + plan = api.panes().each(lambda pane: pane.cmd.resize_pane(height=20)) + + with pytest.raises(api.NoCommandsResolved): + await plan.to_sequence(runner) + + await plan.run(runner) + assert runner.calls == [] + + asyncio.run(scenario()) diff --git a/tests/chainable_commands_experiment/test_async_query_api.py b/tests/chainable_commands_experiment/test_async_query_api.py new file mode 100644 index 000000000..0840796ee --- /dev/null +++ b/tests/chainable_commands_experiment/test_async_query_api.py @@ -0,0 +1,27 @@ +"""Tests for a Piccolo-style async command query API.""" + +from __future__ import annotations + +import asyncio + +from typing_extensions import assert_type + +from . import async_query_api as api + + +def test_async_query_api_runs_all_and_first() -> None: + """Async query terminal methods mirror sync query ergonomics.""" + runner = api.AsyncPaneRunner( + ( + api.AsyncPaneRow("%1", active=True), + api.AsyncPaneRow("%2", active=False), + ), + ) + query = api.AsyncPaneQuery().where(active=True).limit(1) + + rows = asyncio.run(query.all(runner)) + first = asyncio.run(query.first(runner)) + + assert_type(query, api.AsyncPaneQuery) + assert rows == [api.AsyncPaneRow("%1", active=True)] + assert first == api.AsyncPaneRow("%1", active=True) diff --git a/tests/chainable_commands_experiment/test_auto_batch_api.py b/tests/chainable_commands_experiment/test_auto_batch_api.py new file mode 100644 index 000000000..9695ec5f5 --- /dev/null +++ b/tests/chainable_commands_experiment/test_auto_batch_api.py @@ -0,0 +1,35 @@ +"""Tests for transparent auto-batch experiments.""" + +from __future__ import annotations + +import pytest +from typing_extensions import assert_type + +from . import auto_batch_api as api + + +def test_auto_batch_api_batches_self_returning_methods() -> None: + """Self-returning methods can be batched without needing command output.""" + target = api.AutoBatchTarget() + + returned = target.rename_window("work").select_layout("even-horizontal") + + assert_type(returned, api.AutoBatchTarget) + assert returned is target + assert target.to_sequence().argv() == ( + "rename-window", + "work", + ";", + "select-layout", + "even-horizontal", + ) + + +def test_auto_batch_api_rejects_methods_that_need_deferred_output() -> None: + """Transparent batching cannot satisfy immediate stdout access.""" + target = api.AutoBatchTarget() + + with pytest.raises(api.DeferredOutputUnavailable): + target.show_option("@missing") + + assert target.to_sequence().argv() == ("show-option", "-gqv", "@missing") diff --git a/tests/chainable_commands_experiment/test_builder_api.py b/tests/chainable_commands_experiment/test_builder_api.py new file mode 100644 index 000000000..7cb0b1007 --- /dev/null +++ b/tests/chainable_commands_experiment/test_builder_api.py @@ -0,0 +1,36 @@ +"""Tests for an explicit typed command builder API.""" + +from __future__ import annotations + +from typing_extensions import assert_type + +from . import builder_api as api +from .shared import CommandCall, CommandSequence + + +def test_builder_api_exposes_completion_friendly_methods() -> None: + """The builder uses plain methods with explicit signatures.""" + call = api.commands.new_window(window_name="work") + + assert_type(call, CommandCall) + assert call.argv() == ("new-window", "-d", "-n", "work") + + +def test_builder_api_supports_q_like_sequence_expression() -> None: + """A Q-like immutable expression can represent ordered command sequences.""" + expr = api.sequence( + api.commands.new_window(window_name="work"), + ) >> api.commands.split_window(horizontal=True, percentage=50) + + assert_type(expr, CommandSequence) + assert expr.argv() == ( + "new-window", + "-d", + "-n", + "work", + ";", + "split-window", + "-h", + "-p", + "50", + ) diff --git a/tests/chainable_commands_experiment/test_command_object_api.py b/tests/chainable_commands_experiment/test_command_object_api.py new file mode 100644 index 000000000..a31fb6253 --- /dev/null +++ b/tests/chainable_commands_experiment/test_command_object_api.py @@ -0,0 +1,276 @@ +"""Tests for explicit command-object API ergonomics.""" + +from __future__ import annotations + +import typing as t +from dataclasses import dataclass, field + +import pytest +from typing_extensions import assert_type + +from libtmux import Session + +from . import command_object_api as api +from .shared import Arg, CommandCall + + +class CommandObjectArgvCase(t.NamedTuple): + """Test fixture for command object argv rendering.""" + + test_id: str + command: api.CommandValue + expected_argv: tuple[str, ...] + + +COMMAND_OBJECT_ARGV_CASES: list[CommandObjectArgvCase] = [ + CommandObjectArgvCase( + test_id="pane_split_window_target_and_percentage", + command=api.PaneCmd.SplitWindow( + target="%1", + horizontal=True, + percentage=50, + ), + expected_argv=("split-window", "-t", "%1", "-h", "-p", "50"), + ), + CommandObjectArgvCase( + test_id="pane_capture_print_output", + command=api.PaneCmd.CapturePane( + target="%1", + print_output=True, + ), + expected_argv=("capture-pane", "-t", "%1", "-p"), + ), + CommandObjectArgvCase( + test_id="window_rename_target_keyword", + command=api.WindowCmd.RenameWindow( + target="@1", + name="editor", + ), + expected_argv=("rename-window", "-t", "@1", "editor"), + ), + CommandObjectArgvCase( + test_id="session_new_window_keyword_options", + command=api.SessionCmd.NewWindow( + target="$1", + window_name="work", + detach=True, + ), + expected_argv=("new-window", "-t", "$1", "-d", "-n", "work"), + ), + CommandObjectArgvCase( + test_id="server_show_option_keyword", + command=api.ServerCmd.ShowOption( + option_name="@demo", + ), + expected_argv=("show-option", "-gqv", "@demo"), + ), +] + + +@dataclass +class _FakeResult: + """Small command result for command-object runner tests.""" + + stdout: list[str] = field(default_factory=list) + stderr: list[str] = field(default_factory=list) + returncode: int = 0 + + +@dataclass +class _FakeRunner: + """Record command-object dispatches.""" + + calls: list[tuple[str, tuple[Arg, ...], str | int | None]] = field( + default_factory=list, + ) + + def cmd( + self, + cmd: str, + *args: Arg, + target: str | int | None = None, + ) -> _FakeResult: + """Record one command dispatch.""" + self.calls.append((cmd, args, target)) + return _FakeResult(stdout=["ok"]) + + +@pytest.mark.parametrize( + list(CommandObjectArgvCase._fields), + COMMAND_OBJECT_ARGV_CASES, + ids=[case.test_id for case in COMMAND_OBJECT_ARGV_CASES], +) +def test_command_objects_are_constructed_without_running( + test_id: str, + command: api.CommandValue, + expected_argv: tuple[str, ...], +) -> None: + """Command objects expose pure argv rendering before side effects.""" + assert test_id + assert command.argv() == expected_argv + assert command.to_call().argv() == expected_argv + + +def test_command_objects_preserve_concrete_types_and_metadata() -> None: + """Concrete command classes keep completion-friendly types and metadata.""" + command = api.PaneCmd.SplitWindow( + target="%1", + horizontal=True, + percentage=50, + ) + + assert_type(command, api.PaneCmd.SplitWindow) + assert_type(command.to_call(), CommandCall) + assert command.spec.name == "split-window" + assert command.spec.scope == "pane" + + +def test_single_command_object_runs_through_runner_boundary() -> None: + """A single command value can run only at the explicit boundary.""" + runner = _FakeRunner() + command = api.PaneCmd.SplitWindow( + target="%1", + horizontal=True, + percentage=50, + ) + + result = command.run(runner) + + assert result.stdout == ["ok"] + assert runner.calls == [ + ( + "split-window", + ("-h", "-p", 50), + "%1", + ), + ] + + +def test_command_objects_compose_to_native_sequence() -> None: + """Command objects compose into the shared semicolon sequence IR.""" + sequence = ( + api.SessionCmd.NewWindow(window_name="work") + .then(api.PaneCmd.SplitWindow(horizontal=True, percentage=50)) + .then(api.WindowCmd.SelectLayout(layout="even-horizontal")) + ) + + assert_type(sequence, api.CommandObjectSequence) + assert sequence.argv() == ( + "new-window", + "-d", + "-n", + "work", + ";", + "split-window", + "-h", + "-p", + "50", + ";", + "select-layout", + "even-horizontal", + ) + + +def test_command_object_sequence_runs_as_one_runner_call() -> None: + """Running a sequence batches command objects into one tmux dispatch.""" + runner = _FakeRunner() + sequence = api.CommandSequenceBuilder( + api.SessionCmd.NewWindow(window_name="work"), + api.PaneCmd.SplitWindow(horizontal=True, percentage=50), + api.WindowCmd.SelectLayout(layout="even-horizontal"), + ) + + result = sequence.run(runner) + + assert result.stdout == ["ok"] + assert runner.calls == [ + ( + "new-window", + ( + "-d", + "-n", + "work", + ";", + "split-window", + "-h", + "-p", + "50", + ";", + "select-layout", + "even-horizontal", + ), + None, + ), + ] + + +def test_command_object_batch_keeps_keyword_arg_completion() -> None: + """Command namespaces provide ergonomic batching without hidden effects.""" + with api.CommandBatch() as commands: + new_window = commands.session.new_window(window_name="work") + split_window = commands.pane.split_window( + horizontal=True, + percentage=50, + ) + commands.window.select_layout(layout="even-horizontal") + + assert_type(new_window, api.SessionCmd.NewWindow) + assert_type(split_window, api.PaneCmd.SplitWindow) + assert commands.to_sequence().argv() == ( + "new-window", + "-d", + "-n", + "work", + ";", + "split-window", + "-h", + "-p", + "50", + ";", + "select-layout", + "even-horizontal", + ) + + +def test_command_object_sequence_runs_against_tmux(session: Session) -> None: + """A command-object sequence can still use live tmux integration coverage.""" + key = "@libtmux_command_object_pattern_success" + sequence = api.ServerCmd.SetOption( + option_name=key, + value="before", + ).then( + api.ServerCmd.SetOption( + option_name=key, + value="after", + ), + ) + + result = sequence.run(session.server) + readback = api.ServerCmd.ShowOption(option_name=key).run(session.server) + + assert result.returncode == 0 + assert readback.stdout == ["after"] + + +def test_command_object_sequence_stops_after_tmux_error(session: Session) -> None: + """Native tmux sequencing semantics remain testable at integration level.""" + key = "@libtmux_command_object_pattern_error" + sequence = ( + api.ServerCmd.SetOption( + option_name=key, + value="before", + ) + .then(api.ServerCmd.HasSession(session_name="definitely_missing_session")) + .then( + api.ServerCmd.SetOption( + option_name=key, + value="after", + ), + ) + ) + + result = sequence.run(session.server) + readback = api.ServerCmd.ShowOption(option_name=key).run(session.server) + + assert result.returncode != 0 + assert readback.stdout == ["before"] diff --git a/tests/chainable_commands_experiment/test_context_api.py b/tests/chainable_commands_experiment/test_context_api.py new file mode 100644 index 000000000..d6c189fc0 --- /dev/null +++ b/tests/chainable_commands_experiment/test_context_api.py @@ -0,0 +1,29 @@ +"""Tests for an explicit typed command-batch context API.""" + +from __future__ import annotations + +from typing_extensions import assert_type + +from . import context_api as api +from .shared import CommandCall, CommandSequence + + +def test_context_api_batches_typed_methods() -> None: + """Context batching keeps completion on explicit batch methods.""" + with api.CommandBatch() as batch: + call = batch.new_window(window_name="work") + batch.split_window(horizontal=True, percentage=50) + + assert_type(call, CommandCall) + assert_type(batch.to_sequence(), CommandSequence) + assert batch.to_sequence().argv() == ( + "new-window", + "-d", + "-n", + "work", + ";", + "split-window", + "-h", + "-p", + "50", + ) diff --git a/tests/chainable_commands_experiment/test_dag_api.py b/tests/chainable_commands_experiment/test_dag_api.py new file mode 100644 index 000000000..2166d27f6 --- /dev/null +++ b/tests/chainable_commands_experiment/test_dag_api.py @@ -0,0 +1,45 @@ +"""Tests for a Hamilton-style command DAG API.""" + +from __future__ import annotations + +from typing_extensions import assert_type + +from . import dag_api as api +from .shared import CommandCall, CommandSequence + + +@api.command_step("new_window") +def _new_window() -> CommandCall: + """Build the first command in the DAG.""" + return CommandCall("new-window", ("-d", "-n", "work")) + + +@api.command_step("split_window", depends_on=("new_window",)) +def _split_window() -> CommandCall: + """Build the dependent command in the DAG.""" + return CommandCall("split-window", ("-h",)) + + +def test_dag_api_orders_steps_by_dependencies() -> None: + """The requested output determines the command dependency order.""" + dag = api.CommandDag((_split_window, _new_window), outputs=("split_window",)) + + sequence = dag.sequence() + + assert_type(sequence, CommandSequence) + assert sequence.argv() == ( + "new-window", + "-d", + "-n", + "work", + ";", + "split-window", + "-h", + ) + + +def test_dag_api_rejects_missing_dependencies() -> None: + """DAG validation catches incomplete output plans.""" + missing = api.CommandDag((_split_window,), outputs=("split_window",)) + + assert missing.missing_dependencies() == ("new_window",) diff --git a/tests/chainable_commands_experiment/test_decorator_api.py b/tests/chainable_commands_experiment/test_decorator_api.py new file mode 100644 index 000000000..915134c67 --- /dev/null +++ b/tests/chainable_commands_experiment/test_decorator_api.py @@ -0,0 +1,45 @@ +"""Tests for a Django-style decorator command API.""" + +from __future__ import annotations + +from typing_extensions import assert_type + +from . import decorator_api as api +from .shared import CommandCall + + +def test_decorator_api_preserves_typed_call_signature() -> None: + """Decorated command factories keep normal function-call ergonomics.""" + call = api.new_window(window_name="work", detach=True) + + assert_type(call, CommandCall) + assert call.argv() == ("new-window", "-d", "-n", "work") + + +def test_decorator_api_exposes_command_metadata() -> None: + """Command metadata can be recovered from decorated callables.""" + spec = api.get_command_spec(api.new_window) + + assert spec.name == "new-window" + assert spec.scope == "session" + assert spec.chainable is True + + +def test_decorator_api_builds_chain() -> None: + """Decorator factories compose through the shared chain operators.""" + chain = api.new_window(window_name="work") >> api.split_window( + horizontal=True, + percentage=50, + ) + + assert chain.argv() == ( + "new-window", + "-d", + "-n", + "work", + ";", + "split-window", + "-h", + "-p", + "50", + ) diff --git a/tests/chainable_commands_experiment/test_deferred_plan_api.py b/tests/chainable_commands_experiment/test_deferred_plan_api.py new file mode 100644 index 000000000..ad44ab7da --- /dev/null +++ b/tests/chainable_commands_experiment/test_deferred_plan_api.py @@ -0,0 +1,271 @@ +"""Tests for deferred query-driven command plan experiments.""" + +from __future__ import annotations + +import typing as t +from dataclasses import dataclass, field + +import pytest +from typing_extensions import assert_type + +from . import deferred_plan_api as api +from .shared import Arg + + +@dataclass +class _FakeResult: + """Small command result for deferred-plan runner tests.""" + + stdout: list[str] = field(default_factory=list) + stderr: list[str] = field(default_factory=list) + returncode: int = 0 + + +@dataclass +class _FakeRunner: + """Runner that exposes a snapshot and records tmux dispatches.""" + + snapshot_value: api.TmuxSnapshot + calls: list[tuple[str, tuple[Arg, ...], str | int | None]] = field( + default_factory=list, + ) + + def snapshot(self) -> api.TmuxSnapshot: + """Return the fixed tmux snapshot.""" + return self.snapshot_value + + def cmd( + self, + cmd: str, + *args: Arg, + target: str | int | None = None, + ) -> _FakeResult: + """Record one command dispatch.""" + self.calls.append((cmd, args, target)) + return _FakeResult(stdout=["ok"]) + + +def _snapshot() -> api.TmuxSnapshot: + return api.TmuxSnapshot( + panes=( + api.PaneRef( + pane_id=api.PaneTarget("%2"), + window_id=api.WindowTarget("@1"), + session_id=api.SessionTarget("$0"), + pane_index=2, + active=True, + title="shell", + ), + api.PaneRef( + pane_id=api.PaneTarget("%1"), + window_id=api.WindowTarget("@1"), + session_id=api.SessionTarget("$0"), + pane_index=1, + active=True, + title="editor", + ), + api.PaneRef( + pane_id=api.PaneTarget("%3"), + window_id=api.WindowTarget("@2"), + session_id=api.SessionTarget("$0"), + pane_index=3, + active=False, + title="logs", + ), + ), + ) + + +def test_typed_targets_and_bound_commands_render_targets() -> None: + """Bound command namespaces keep pane and window targets typed.""" + pane = _snapshot().panes[0] + + pane_call = pane.cmd.send_keys("clear", enter=True) + window_call = pane.window.select_layout("even-horizontal") + + assert_type(pane.pane_id, api.PaneTarget) + assert_type(pane.window_id, api.WindowTarget) + assert_type(pane.session_id, api.SessionTarget) + assert pane_call.argv() == ("send-keys", "-t", "%2", "clear", "Enter") + assert window_call.argv() == ( + "select-layout", + "-t", + "@1", + "even-horizontal", + ) + + +def test_each_defers_mapper_until_sequence_resolution() -> None: + """``each`` stores a plan node instead of eagerly calling the mapper.""" + mapper_calls: list[api.PaneRef] = [] + + def mapper(pane: api.PaneRef) -> api.CommandValue: + mapper_calls.append(pane) + return pane.cmd.resize_pane(height=20) + + plan = api.panes().filter(active=True).each(mapper) + + assert_type(plan, api.CommandPlan[None]) + assert mapper_calls == [] + + sequence = plan.to_sequence(_snapshot()) + + assert [pane.pane_id.value for pane in mapper_calls] == ["%2", "%1"] + assert sequence.argvs() == ( + ("resize-pane", "-t", "%2", "-y", "20"), + ("resize-pane", "-t", "%1", "-y", "20"), + ) + + +def test_snapshot_sequence_filters_orders_and_flattens_commands() -> None: + """Snapshot compilation gives pure assertions without touching tmux.""" + plan = ( + api.panes() + .filter(active=True) + .order_by("pane_index") + .each( + lambda pane: [ + pane.cmd.send_keys("clear", enter=True), + pane.cmd.resize_pane(height=20), + ], + ) + ) + + sequence = plan.to_sequence(_snapshot()) + + assert sequence.argvs() == ( + ("send-keys", "-t", "%1", "clear", "Enter"), + ("resize-pane", "-t", "%1", "-y", "20"), + ("send-keys", "-t", "%2", "clear", "Enter"), + ("resize-pane", "-t", "%2", "-y", "20"), + ) + assert sequence.argv() == ( + "send-keys", + "-t", + "%1", + "clear", + "Enter", + ";", + "resize-pane", + "-t", + "%1", + "-y", + "20", + ";", + "send-keys", + "-t", + "%2", + "clear", + "Enter", + ";", + "resize-pane", + "-t", + "%2", + "-y", + "20", + ) + + +def test_flat_map_expands_multiple_commands_per_row() -> None: + """``flat_map`` exposes the explicit multi-command row expansion.""" + plan = ( + api.panes() + .filter(active=True) + .flat_map( + lambda pane: ( + pane.cmd.resize_pane(height=10), + pane.window.select_layout("even-horizontal"), + ), + ) + ) + + assert plan.to_sequence(_snapshot()).argvs() == ( + ("resize-pane", "-t", "%2", "-y", "10"), + ("select-layout", "-t", "@1", "even-horizontal"), + ("resize-pane", "-t", "%1", "-y", "10"), + ("select-layout", "-t", "@1", "even-horizontal"), + ) + + +def test_map_transforms_rows_without_creating_commands() -> None: + """``map`` remains data-oriented and separate from command construction.""" + query = api.panes().filter(active=True).order_by("pane_index") + + titles = query.map(lambda pane: pane.title).all(_snapshot()) + + assert_type(titles, list[str]) + assert titles == ["editor", "shell"] + + +def test_run_resolves_live_snapshot_and_dispatches_once() -> None: + """``run`` resolves the query and executes one native tmux sequence.""" + runner = _FakeRunner(_snapshot()) + plan = ( + api.panes() + .filter(active=True) + .order_by("pane_index") + .each( + lambda pane: pane.cmd.send_keys("clear", enter=True), + ) + ) + + plan.run(runner) + + assert runner.calls == [ + ( + "send-keys", + ( + "-t", + "%1", + "clear", + "Enter", + ";", + "send-keys", + "-t", + "%2", + "clear", + "Enter", + ), + None, + ), + ] + + +def test_to_sequence_uses_runner_snapshot_without_dispatching() -> None: + """Resolving against a runner still keeps execution explicit.""" + runner = _FakeRunner(_snapshot()) + plan = ( + api.panes() + .filter(active=True) + .each( + lambda pane: pane.cmd.resize_pane(height=12), + ) + ) + + sequence = plan.to_sequence(runner) + + assert sequence.argvs() == ( + ("resize-pane", "-t", "%2", "-y", "12"), + ("resize-pane", "-t", "%1", "-y", "12"), + ) + assert runner.calls == [] + + +def test_empty_query_to_sequence_raises_but_run_is_noop() -> None: + """Empty query plans are inspectably empty and executable as no-ops.""" + runner = _FakeRunner(api.TmuxSnapshot(panes=())) + plan = api.panes().each(lambda pane: pane.cmd.resize_pane(height=20)) + + with pytest.raises(api.NoCommandsResolved): + plan.to_sequence(runner) + + plan.run(runner) + assert runner.calls == [] + + +def test_each_rejects_string_iterable_command_results() -> None: + """String-like mapper results are not accepted as command iterables.""" + plan = api.panes().each(lambda pane: t.cast(t.Any, pane.title)) + + with pytest.raises(TypeError, match="command mapper"): + plan.to_sequence(_snapshot()) diff --git a/tests/chainable_commands_experiment/test_descriptor_api.py b/tests/chainable_commands_experiment/test_descriptor_api.py new file mode 100644 index 000000000..e0c5483aa --- /dev/null +++ b/tests/chainable_commands_experiment/test_descriptor_api.py @@ -0,0 +1,47 @@ +"""Tests for typed command descriptor objects.""" + +from __future__ import annotations + +from typing_extensions import assert_type + +from . import descriptor_api as api +from .shared import CommandCall + + +def test_descriptor_api_binds_typed_command_object() -> None: + """Descriptors expose a concrete command object for completion.""" + commands = api.Commands() + + assert_type(commands.new_window, api.NewWindowCommand) + assert_type(commands.split_window, api.SplitWindowCommand) + + +def test_descriptor_api_builds_invocation_with_metadata() -> None: + """Bound descriptor calls produce command invocations.""" + commands = api.Commands() + call = commands.new_window(window_name="work", detach=True) + + assert_type(call, CommandCall) + assert commands.new_window.spec.name == "new-window" + assert call.argv() == ("new-window", "-d", "-n", "work") + + +def test_descriptor_api_builds_chain() -> None: + """Descriptor command objects compose through the shared chain type.""" + commands = api.Commands() + chain = commands.new_window(window_name="work") >> commands.split_window( + horizontal=True, + percentage=50, + ) + + assert chain.argv() == ( + "new-window", + "-d", + "-n", + "work", + ";", + "split-window", + "-h", + "-p", + "50", + ) diff --git a/tests/chainable_commands_experiment/test_document_query_api.py b/tests/chainable_commands_experiment/test_document_query_api.py new file mode 100644 index 000000000..2b578183a --- /dev/null +++ b/tests/chainable_commands_experiment/test_document_query_api.py @@ -0,0 +1,26 @@ +"""Tests for a Beanie-style document command query API.""" + +from __future__ import annotations + +from typing_extensions import assert_type + +from . import document_query_api as api + + +def test_document_query_api_filters_command_documents() -> None: + """Document-shaped metadata supports simple typed filtering.""" + documents = ( + api.CommandDocument(name="capture-pane", scope="pane", chainable=True), + api.CommandDocument(name="new-window", scope="session", chainable=True), + api.CommandDocument(name="display-message", scope="server", chainable=False), + ) + query = ( + api.CommandDocumentQuery(documents) + .where(scope="pane", chainable=True) + .where_name("capture-pane") + ) + + result = query.all() + + assert_type(query, api.CommandDocumentQuery) + assert result == [documents[0]] diff --git a/tests/chainable_commands_experiment/test_expression_api.py b/tests/chainable_commands_experiment/test_expression_api.py new file mode 100644 index 000000000..d272ea018 --- /dev/null +++ b/tests/chainable_commands_experiment/test_expression_api.py @@ -0,0 +1,33 @@ +"""Tests for an Ibis-style typed expression API.""" + +from __future__ import annotations + +from typing_extensions import assert_type + +from . import expression_api as api + + +def test_expression_api_compiles_typed_fields_and_predicates() -> None: + """Expression trees compile before execution.""" + pane = api.PaneTable() + expr = pane.where(pane.active.eq(True)).select(pane.id, pane.title) + + compiled = expr.compile() + + assert_type(expr, api.TableExpression) + assert compiled.fields == ("pane_id", "pane_title") + assert compiled.predicates == ("pane_active=True",) + + +def test_expression_api_executes_against_backend_rows() -> None: + """A backend runner owns materialization.""" + pane = api.PaneTable() + expr = pane.where(pane.active.eq(True)).select(pane.id) + runner = api.ExpressionRunner( + ( + {"pane_id": "%1", "pane_active": True}, + {"pane_id": "%2", "pane_active": False}, + ), + ) + + assert expr.execute(runner) == [{"pane_id": "%1"}] diff --git a/tests/chainable_commands_experiment/test_generated_client_api.py b/tests/chainable_commands_experiment/test_generated_client_api.py new file mode 100644 index 000000000..374afba6e --- /dev/null +++ b/tests/chainable_commands_experiment/test_generated_client_api.py @@ -0,0 +1,39 @@ +"""Tests for a Prisma-style generated command client API.""" + +from __future__ import annotations + +from typing_extensions import assert_type + +from . import generated_client_api as api +from .shared import CommandCall, CommandSequence + + +def test_generated_client_api_exposes_nested_typed_namespaces() -> None: + """Generated namespaces provide explicit completion surfaces.""" + commands = api.GeneratedCommands() + + call = commands.window.rename(target="@1", name="editor") + + assert_type(call, CommandCall) + assert call.argv() == ("rename-window", "-t", "@1", "editor") + + +def test_generated_client_api_composes_generated_calls() -> None: + """Generated command calls still use the shared sequence IR.""" + commands = api.GeneratedCommands() + sequence = commands.session.new_window(name="editor").then( + commands.window.rename(target="@1", name="editor"), + ) + + assert_type(sequence, CommandSequence) + assert sequence.argv() == ( + "new-window", + "-d", + "-n", + "editor", + ";", + "rename-window", + "-t", + "@1", + "editor", + ) diff --git a/tests/chainable_commands_experiment/test_lazy_plan_api.py b/tests/chainable_commands_experiment/test_lazy_plan_api.py new file mode 100644 index 000000000..b41f1490c --- /dev/null +++ b/tests/chainable_commands_experiment/test_lazy_plan_api.py @@ -0,0 +1,36 @@ +"""Tests for a Polars-style lazy command plan API.""" + +from __future__ import annotations + +from typing_extensions import assert_type + +from . import lazy_plan_api as api +from .shared import CommandCall + + +def test_lazy_plan_api_filters_selects_and_collects() -> None: + """Lazy plans collect command rows only at the terminal boundary.""" + plan = api.LazyCommandPlan.from_calls( + ( + api.PlannedCall("window", CommandCall("rename-window", ("work",))), + api.PlannedCall("pane", CommandCall("capture-pane", ("-p",), target="%1")), + ), + ) + selected = plan.filter_scope("pane").select("name", "target") + + rows = selected.collect() + + assert_type(selected, api.LazyCommandPlan) + assert rows == [api.CommandRow(name="capture-pane", target="%1")] + + +def test_lazy_plan_api_explains_optimization_boundary() -> None: + """The demo exposes an explicit plan boundary before collection.""" + plan = api.LazyCommandPlan.from_calls(()).filter_scope("pane").select("name") + + optimized = plan.optimize() + + assert optimized.explain() == ( + "filter_scope=pane", + "select=name", + ) diff --git a/tests/chainable_commands_experiment/test_orchestration_api.py b/tests/chainable_commands_experiment/test_orchestration_api.py new file mode 100644 index 000000000..e6aa40786 --- /dev/null +++ b/tests/chainable_commands_experiment/test_orchestration_api.py @@ -0,0 +1,42 @@ +"""Tests for a Prefect-style command orchestration API.""" + +from __future__ import annotations + +from typing_extensions import assert_type + +from . import orchestration_api as api +from .shared import CommandCall, CommandSequence + + +def test_orchestration_api_submits_typed_command_tasks() -> None: + """Submitting a task defers creation of a concrete command call.""" + task = api.CommandTask(api.rename_window) + submitted = task.submit("@1", "editor") + + assert_type(submitted, api.SubmittedCommand) + assert submitted.call.argv() == ("rename-window", "-t", "@1", "editor") + + +def test_orchestration_api_maps_submitted_commands_to_sequence() -> None: + """Mapped submissions can still collapse to a native tmux sequence.""" + task = api.CommandTask(api.rename_window) + submitted = task.map((("@1", "editor"), ("@2", "logs"))) + + sequence = api.submitted_sequence(submitted) + + assert_type(sequence, CommandSequence) + assert [item.call for item in submitted] == [ + CommandCall("rename-window", ("editor",), target="@1"), + CommandCall("rename-window", ("logs",), target="@2"), + ] + assert sequence.argv() == ( + "rename-window", + "-t", + "@1", + "editor", + ";", + "rename-window", + "-t", + "@2", + "logs", + ) diff --git a/tests/chainable_commands_experiment/test_queryset_api.py b/tests/chainable_commands_experiment/test_queryset_api.py new file mode 100644 index 000000000..9edc9385d --- /dev/null +++ b/tests/chainable_commands_experiment/test_queryset_api.py @@ -0,0 +1,34 @@ +"""Tests for a Django QuerySet-style lazy command query API.""" + +from __future__ import annotations + +from typing_extensions import assert_type + +from . import queryset_api as api + + +def test_queryset_api_filters_orders_and_limits_lazily() -> None: + """Pane queries stack filters until terminal evaluation.""" + rows = ( + api.PaneRow("%2", pane_index=2, active=True, title="shell"), + api.PaneRow("%1", pane_index=1, active=True, title="editor"), + api.PaneRow("%3", pane_index=3, active=False, title="logs"), + ) + runner = api.StaticPaneRunner(rows) + query = api.PaneQuery().filter(active=True).order_by("pane_index").limit(1) + + result = query.all(runner) + + assert_type(query, api.PaneQuery) + assert_type(result, list[api.PaneRow]) + assert result == [rows[1]] + assert query.first(runner) == rows[1] + + +def test_queryset_api_original_query_stays_unchanged() -> None: + """Lazy query methods return new query objects.""" + query = api.PaneQuery() + filtered = query.filter(active=False) + + assert query is not filtered + assert filtered.all(api.StaticPaneRunner((api.PaneRow("%1", 1, False, "logs"),))) diff --git a/tests/chainable_commands_experiment/test_runnable_api.py b/tests/chainable_commands_experiment/test_runnable_api.py new file mode 100644 index 000000000..d03561fb5 --- /dev/null +++ b/tests/chainable_commands_experiment/test_runnable_api.py @@ -0,0 +1,52 @@ +"""Tests for a LangChain-style runnable command API.""" + +from __future__ import annotations + +from dataclasses import dataclass, field + +from typing_extensions import assert_type + +from . import runnable_api as api +from .shared import Arg + + +@dataclass +class _FakeRunner: + """Record runnable command dispatches.""" + + calls: list[tuple[str, tuple[Arg, ...], str | int | None]] = field( + default_factory=list, + ) + + def cmd( + self, + cmd: str, + *args: Arg, + target: str | int | None = None, + ) -> api.RunRecord: + """Record one command dispatch.""" + self.calls.append((cmd, args, target)) + return api.RunRecord(command=cmd, args=args, target=target, stdout=["ok"]) + + +def test_runnable_api_invokes_batches_and_streams() -> None: + """Runnable commands support symmetric single and batch execution.""" + runner = _FakeRunner() + pipeline = api.target_capture_call().then(api.run_command()) + + result = pipeline.invoke("%1", runner) + results = pipeline.batch(["%2", "%3"], runner) + streamed = list(pipeline.stream("%4", runner)) + + assert_type(pipeline, api.RunnableCommand[str, api.RunRecord]) + assert result.command == "capture-pane" + assert results[0].target == "%2" + assert streamed[0].target == "%4" + + +def test_runnable_api_composes_with_shift_operator() -> None: + """The shorthand composition operator remains type preserving.""" + pipeline = api.target_capture_call() >> api.render_argv() + + assert_type(pipeline, api.RunnableCommand[str, tuple[str, ...]]) + assert pipeline.invoke("%1", _FakeRunner()) == ("capture-pane", "-t", "%1", "-p") diff --git a/tests/chainable_commands_experiment/test_selection_api.py b/tests/chainable_commands_experiment/test_selection_api.py new file mode 100644 index 000000000..c92964b5e --- /dev/null +++ b/tests/chainable_commands_experiment/test_selection_api.py @@ -0,0 +1,32 @@ +"""Tests for a GraphQL-style nested tmux selection API.""" + +from __future__ import annotations + +from typing_extensions import assert_type + +from . import selection_api as api + + +def test_selection_api_compiles_explicit_depth() -> None: + """Nested selection makes traversal depth explicit.""" + query = ( + api.TmuxSelection.server() + .sessions() + .windows() + .panes() + .fields("pane_id", "pane_title") + ) + + plan = query.compile() + + assert_type(query, api.SelectionQuery) + assert plan.scopes == ("server", "session", "window", "pane") + assert plan.fields == ("pane_id", "pane_title") + + +def test_selection_api_runner_materializes_payload() -> None: + """The runner executes the compiled selection plan.""" + query = api.TmuxSelection.server().sessions().fields("session_id") + runner = api.StaticSelectionRunner({"session_id": ("$1", "$2")}) + + assert query.run(runner) == {"session_id": ("$1", "$2")} diff --git a/tests/chainable_commands_experiment/test_shared.py b/tests/chainable_commands_experiment/test_shared.py new file mode 100644 index 000000000..89294ec93 --- /dev/null +++ b/tests/chainable_commands_experiment/test_shared.py @@ -0,0 +1,132 @@ +"""Tests for the shared command-chain substrate.""" + +from __future__ import annotations + +import typing as t +from dataclasses import dataclass, field + +from typing_extensions import assert_type + +from libtmux import Session + +from .shared import CommandCall, CommandSequence + +Arg: t.TypeAlias = str | int + + +@dataclass +class _FakeResult: + """Small command result for fake runner tests.""" + + stdout: list[str] = field(default_factory=list) + stderr: list[str] = field(default_factory=list) + returncode: int = 0 + + +@dataclass +class _FakeRunner: + """Record calls made by CommandSequence.run().""" + + calls: list[tuple[str, tuple[Arg, ...], str | int | None]] = field( + default_factory=list, + ) + + def cmd( + self, + cmd: str, + *args: Arg, + target: str | int | None = None, + ) -> _FakeResult: + """Record one command dispatch.""" + self.calls.append((cmd, args, target)) + return _FakeResult(stdout=["ok"]) + + +def test_command_sequence_renders_tmux_semicolon_sequence() -> None: + """Render a native tmux sequence with semicolon separator tokens.""" + sequence = CommandCall("new-window", ("-d", "-n", "work")).then( + CommandCall("split-window", ("-h", "-p", 50)), + ) + + assert_type(sequence, CommandSequence) + assert sequence.argv() == ( + "new-window", + "-d", + "-n", + "work", + ";", + "split-window", + "-h", + "-p", + "50", + ) + + +def test_command_sequence_escapes_literal_semicolon_arguments() -> None: + """Render literal trailing semicolons as arguments, not separators.""" + sequence = CommandSequence( + ( + CommandCall("display-message", ("value;",)), + CommandCall("display-message", (";",)), + ), + ) + + assert sequence.argv() == ( + "display-message", + "value\\;", + ";", + "display-message", + "\\;", + ) + + +def test_command_sequence_runs_as_single_runner_call() -> None: + """Execute a sequence through one runner call.""" + runner = _FakeRunner() + result = ( + CommandCall("new-window", ("-d", "-n", "work")) + >> CommandCall("split-window", ("-h", "-p", 50)) + ).run(runner) + + assert result.stdout == ["ok"] + assert runner.calls == [ + ( + "new-window", + ("-d", "-n", "work", ";", "split-window", "-h", "-p", "50"), + None, + ), + ] + + +def test_tmux_executes_native_command_sequence(session: Session) -> None: + """A successful native sequence updates state in order.""" + key = "@libtmux_chainable_experiment_success" + sequence = CommandCall("set-option", ("-gq", key, "before")).then( + CommandCall("set-option", ("-gq", key, "after")), + ) + + result = sequence.run(session.server) + + readback = CommandSequence((CommandCall("show-option", ("-gqv", key)),)).run( + session.server, + ) + assert result.returncode == 0 + assert readback.stdout == ["after"] + + +def test_tmux_stops_native_sequence_after_error(session: Session) -> None: + """Tmux skips later commands in a sequence after an error.""" + key = "@libtmux_chainable_experiment_error" + sequence = ( + CommandCall("set-option", ("-gq", key, "before")) + >> CommandCall("has-session", ("-t", "definitely_missing_session")) + >> CommandCall("set-option", ("-gq", key, "after")) + ) + + result = sequence.run(session.server) + + readback = CommandSequence((CommandCall("show-option", ("-gqv", key)),)).run( + session.server, + ) + assert result.returncode != 0 + assert readback.stdout == ["before"] diff --git a/tests/chainable_commands_experiment/test_statement_api.py b/tests/chainable_commands_experiment/test_statement_api.py new file mode 100644 index 000000000..eed86463a --- /dev/null +++ b/tests/chainable_commands_experiment/test_statement_api.py @@ -0,0 +1,31 @@ +"""Tests for a SQLAlchemy-style immutable command statement API.""" + +from __future__ import annotations + +from typing_extensions import assert_type + +from . import statement_api as api +from .shared import CommandCall + + +def test_statement_api_builds_command_call_genuinely() -> None: + """Generative statement methods return a typed command call at the boundary.""" + stmt = ( + api.CommandStatement("new-window").target("$1").flag("-n", "editor").arg("vim") + ) + call = stmt.to_call() + + assert_type(stmt, api.CommandStatement) + assert_type(call, CommandCall) + assert call.argv() == ("new-window", "-t", "$1", "-n", "editor", "vim") + + +def test_statement_api_runner_executes_statement() -> None: + """A runner can own the execution boundary.""" + runner = api.StatementRunner() + stmt = api.CommandStatement("display-message").arg("hello") + + result = runner.execute(stmt) + + assert result.argv == ("display-message", "hello") + assert runner.executed == [("display-message", "hello")]