Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/chainable_commands_experiment/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Typed command-chain API experiments for libtmux."""
98 changes: 98 additions & 0 deletions tests/chainable_commands_experiment/ast_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""AST-assisted command-chain experiment."""

from __future__ import annotations

import ast
import inspect
import textwrap
from collections.abc import Callable

from .shared import (
CommandCall,
CommandSequence,
new_window_call,
split_window_call,
)


class UnsupportedAstShape(ValueError):
"""Raised when a callable uses syntax outside the AST prototype."""


class AstCommandProxy:
"""Typed proxy used by the AST experiment."""

def __init__(self) -> None:
"""Initialize an empty proxy call list."""
self._calls: list[CommandCall] = []

def to_sequence(self) -> CommandSequence:
"""Return accumulated calls as a command sequence."""
return CommandSequence(tuple(self._calls))

def new_window(
self,
*,
window_name: str | None = None,
detach: bool = True,
) -> CommandCall:
"""Add a ``new-window`` proxy call."""
call = new_window_call(window_name=window_name, detach=detach)
self._calls.append(call)
return call

def split_window(
self,
*,
horizontal: bool = False,
percentage: int | None = None,
) -> CommandCall:
"""Add a ``split-window`` proxy call."""
call = split_window_call(
horizontal=horizontal,
percentage=percentage,
)
self._calls.append(call)
return call


def command_names_from_ast(
function: Callable[[AstCommandProxy], object],
) -> tuple[str, ...]:
"""Extract command method names from a restricted callable AST."""
function_def = _function_def_from_source(function)
if any(
isinstance(node, ast.For | ast.While | ast.If | ast.Match)
for node in ast.walk(function_def)
):
msg = "control flow is outside this AST prototype"
raise UnsupportedAstShape(msg)

names = [
node.func.attr
for node in ast.walk(function_def)
if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute)
]
if not names:
msg = "no proxy command calls found"
raise UnsupportedAstShape(msg)
return tuple(names)


def from_callable(function: Callable[[AstCommandProxy], object]) -> CommandSequence:
"""Validate a callable's AST, then execute it against a typed proxy."""
command_names_from_ast(function)
proxy = AstCommandProxy()
function(proxy)
return proxy.to_sequence()


def _function_def_from_source(
function: Callable[[AstCommandProxy], object],
) -> ast.FunctionDef:
source = textwrap.dedent(inspect.getsource(function))
module = ast.parse(source)
if len(module.body) != 1 or not isinstance(module.body[0], ast.FunctionDef):
msg = "expected a single function definition"
raise UnsupportedAstShape(msg)
return module.body[0]
172 changes: 172 additions & 0 deletions tests/chainable_commands_experiment/async_deferred_plan_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
"""Asyncio facade over deferred query-command plan experiments."""

from __future__ import annotations

import dataclasses
import typing as t
from dataclasses import dataclass

from . import deferred_plan_api as sync_api
from .shared import Arg, CommandCall, CommandResultLike

MappedT = t.TypeVar("MappedT")
ResultT = t.TypeVar("ResultT")

NoCommandsResolved = sync_api.NoCommandsResolved


class AsyncCommandRunner(t.Protocol):
"""Object capable of asynchronously dispatching one tmux command argv."""

async def cmd(
self,
cmd: str,
*args: Arg,
target: str | int | None = None,
) -> CommandResultLike:
"""Dispatch one tmux command asynchronously."""
...


class AsyncSnapshotProvider(t.Protocol):
"""Object that can asynchronously provide a pure tmux snapshot."""

async def snapshot(self) -> sync_api.TmuxSnapshot:
"""Return a tmux snapshot asynchronously."""
...


class AsyncPlanRunner(
AsyncCommandRunner,
AsyncSnapshotProvider,
t.Protocol,
):
"""Runner that can resolve async queries and dispatch async commands."""


AsyncSnapshotSource: t.TypeAlias = sync_api.TmuxSnapshot | AsyncSnapshotProvider


@dataclass(frozen=True, slots=True)
class PaneQuery:
"""Async lazy pane query backed by the sync deferred query object."""

query: sync_api.PaneQuery

def filter(self, *, active: bool) -> PaneQuery:
"""Return a query filtered by active state."""
return dataclasses.replace(self, query=self.query.filter(active=active))

def order_by(self, field: sync_api.OrderField) -> PaneQuery:
"""Return a query ordered by a known pane field."""
return dataclasses.replace(self, query=self.query.order_by(field))

def limit(self, count: int) -> PaneQuery:
"""Return a query capped to ``count`` rows."""
return dataclasses.replace(self, query=self.query.limit(count))

async def all(self, source: AsyncSnapshotSource) -> list[sync_api.PaneRef]:
"""Evaluate the query against an async snapshot source."""
snapshot = await _resolve_snapshot(source)
return self.query.all(snapshot)

async def first(
self,
source: AsyncSnapshotSource,
) -> sync_api.PaneRef | None:
"""Evaluate the query and return its first row."""
snapshot = await _resolve_snapshot(source)
return self.query.first(snapshot)

def map(
self,
mapper: t.Callable[[sync_api.PaneRef], MappedT],
) -> MappedPaneQuery[MappedT]:
"""Return a data transformation query."""
return MappedPaneQuery(query=self, mapper=mapper)

def each(self, mapper: sync_api.CommandMapper) -> CommandPlan[None]:
"""Return a deferred async side-effect command plan."""
return self.flat_map(mapper)

def flat_map(self, mapper: sync_api.CommandMapper) -> CommandPlan[None]:
"""Return a deferred async multi-command side-effect plan."""
return CommandPlan(query=self, mapper=mapper)


@dataclass(frozen=True, slots=True)
class MappedPaneQuery(t.Generic[MappedT]):
"""Async data-only query transformation over pane refs."""

query: PaneQuery
mapper: t.Callable[[sync_api.PaneRef], MappedT]

async def all(self, source: AsyncSnapshotSource) -> list[MappedT]:
"""Evaluate the query and transform every row."""
return [self.mapper(row) for row in await self.query.all(source)]

async def first(self, source: AsyncSnapshotSource) -> MappedT | None:
"""Evaluate the query and transform the first row."""
row = await self.query.first(source)
if row is None:
return None
return self.mapper(row)


@dataclass(frozen=True, slots=True)
class CommandSequence:
"""Async wrapper around a resolved sync command sequence."""

sequence: sync_api.CommandSequence

@property
def calls(self) -> tuple[CommandCall, ...]:
"""Return resolved command calls."""
return self.sequence.calls

def argvs(self) -> tuple[tuple[str, ...], ...]:
"""Render each command independently."""
return self.sequence.argvs()

def argv(self) -> tuple[str, ...]:
"""Render one native tmux semicolon command sequence."""
return self.sequence.argv()

async def run(self, runner: AsyncCommandRunner) -> CommandResultLike:
"""Dispatch the sequence through one async runner call."""
argv = self.argv()
return await runner.cmd(argv[0], *argv[1:])


@dataclass(frozen=True, slots=True)
class CommandPlan(t.Generic[ResultT]):
"""Async command plan that resolves a query into commands."""

query: PaneQuery
mapper: sync_api.CommandMapper

async def to_sequence(self, source: AsyncSnapshotSource) -> CommandSequence:
"""Resolve the async query and compile mapped commands."""
snapshot = await _resolve_snapshot(source)
plan = self.query.query.flat_map(self.mapper)
return CommandSequence(plan.to_sequence(snapshot))

async def run(self: CommandPlan[None], runner: AsyncPlanRunner) -> None:
"""Resolve, compile, and execute the plan through one async dispatch."""
try:
sequence = await self.to_sequence(runner)
except NoCommandsResolved:
return None
await sequence.run(runner)
return None


def panes() -> PaneQuery:
"""Start an async lazy pane query."""
return PaneQuery(sync_api.panes())


async def _resolve_snapshot(source: AsyncSnapshotSource) -> sync_api.TmuxSnapshot:
if isinstance(source, sync_api.TmuxSnapshot):
return source
return await source.snapshot()
57 changes: 57 additions & 0 deletions tests/chainable_commands_experiment/async_query_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Piccolo-style async query experiment."""

from __future__ import annotations

import dataclasses
from dataclasses import dataclass


@dataclass(frozen=True, slots=True)
class AsyncPaneRow:
"""Small async pane row."""

pane_id: str
active: bool


@dataclass(frozen=True, slots=True)
class AsyncPaneRunner:
"""Async runner returning fixed pane rows."""

rows: tuple[AsyncPaneRow, ...]

async def list_panes(self) -> list[AsyncPaneRow]:
"""Return pane rows asynchronously."""
return list(self.rows)


@dataclass(frozen=True, slots=True)
class AsyncPaneQuery:
"""Immutable async pane query."""

active_filter: bool | None = None
limit_count: int | None = None

def where(self, *, active: bool) -> AsyncPaneQuery:
"""Return a query filtered by active state."""
return dataclasses.replace(self, active_filter=active)

def limit(self, count: int) -> AsyncPaneQuery:
"""Return a query capped to ``count`` rows."""
return dataclasses.replace(self, limit_count=count)

async def all(self, runner: AsyncPaneRunner) -> list[AsyncPaneRow]:
"""Evaluate the async query."""
rows = await runner.list_panes()
if self.active_filter is not None:
rows = [row for row in rows if row.active is self.active_filter]
if self.limit_count is not None:
rows = rows[: self.limit_count]
return rows

async def first(self, runner: AsyncPaneRunner) -> AsyncPaneRow | None:
"""Return the first matching row asynchronously."""
rows = await self.limit(1).all(runner)
if not rows:
return None
return rows[0]
Loading