diff --git a/renpybuild/context.py b/renpybuild/context.py index b96a2783..f6d98853 100644 --- a/renpybuild/context.py +++ b/renpybuild/context.py @@ -2,12 +2,9 @@ import shutil from pathlib import Path import subprocess -import shutil import jinja2 -import renpybuild.run - from typing import Any @@ -226,7 +223,9 @@ def set_names(self, kind : str, task : str, name : str): else: self.var("dlpa", "{{distlib}}/py{{ python }}-{{ platform }}-{{ arch }}") - renpybuild.run.build_environment(self) + from .run import build_environment + + build_environment(self) def expand(self, s : str, **kwargs) -> str: """ @@ -319,7 +318,10 @@ def run(self, command : str, verbose : bool=False, quiet : bool=False, **kwargs) """ command = self.expand(command, **kwargs) - renpybuild.run.run(command, self, verbose, quiet) + + from .run import run + + run(command, self, verbose, quiet) def run_group(self): """ @@ -327,7 +329,9 @@ def run_group(self): that allows multiple commands to be run in parallel. """ - return renpybuild.run.RunGroup(self) + from .run import RunGroup + + return RunGroup(self) def clean(self, d : str="{{build}}"): """ diff --git a/renpybuild/run.py b/renpybuild/run.py index 211eba89..0816c8c2 100644 --- a/renpybuild/run.py +++ b/renpybuild/run.py @@ -1,12 +1,13 @@ import os import re +import sys import shlex import subprocess -import sys import sysconfig -import threading -import jinja2 +from concurrent.futures import Future, ThreadPoolExecutor, as_completed + +from .context import Context # This caches the results of emsdk_environment. emsdk_cache : dict[str, str] = { } @@ -441,38 +442,20 @@ def run(command, context, verbose=False, quiet=False): traceback.print_stack() sys.exit(1) -class RunCommand(threading.Thread): - - def __init__(self, command, context): - super().__init__() - - command = context.expand(command) - self.command = shlex.split(command) - - self.cwd = context.cwd - self.environ = context.environ.copy() - - self.start() +class CommandResult: + """Stores the result of a single command execution.""" - def run(self): - result = subprocess.run(self.command, cwd=self.cwd, env=self.environ, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="utf-8") - self.output = result.stdout - self.code = result.returncode - - def wait(self): - self.join() + def __init__(self, command_str: str, future: Future[tuple[str, int]]): + self.command_str = command_str + self.future = future + self.output: str = "" + self.code: int = 0 + self.done: bool = False def report(self): - print ("-" * 78) - - for i in self.command: - if " " in i: - print(repr(i), end=" ") - else: - print(i, end=" ") - - print() + print("-" * 78) + print(self.command_str) print() print(self.output) @@ -480,11 +463,13 @@ def report(self): print() print(f"Process failed with {self.code}.") -class RunGroup(object): - def __init__(self, context): +class RunGroup: + def __init__(self, context: Context, wait_all: bool = True): + self.executor = ThreadPoolExecutor() self.context = context - self.tasks = [ ] + self.futures: list[CommandResult] = [] + self.wait_all = wait_all def __enter__(self): return self @@ -493,22 +478,99 @@ def __exit__(self, exc_type, exc_value, traceback): if exc_type is not None: return - for i in self.tasks: - i.wait() + # If there are no tasks to do, exit early + if not next(not f.done for f in self.futures): + return - good = [ i for i in self.tasks if i.code == 0 ] - bad = [ i for i in self.tasks if i.code != 0 ] + futures = [f.future for f in self.futures] + total = len(futures) + completed = 0 + failed = 0 - for i in good: - i.report() + stderr = sys.stderr + if stderr is None: + stderr = open(os.devnull, "w") - for i in bad: - i.report() + spinner_chars = "|/-\\" + spinner_i = 0 + last_write_len = 0 + interrupted = False + + while futures: + try: + future = next(as_completed(futures, 0.1)) + cmd_result = next(f for f in self.futures if f.future == future) + futures.remove(future) + + # Clean last spinner output + print(f"\r{' ' * last_write_len}\r", end="", file=stderr) + + cmd_result.output, cmd_result.code = future.result() + cmd_result.done = True + cmd_result.report() + + if cmd_result.code == 0: + completed += 1 + else: + failed += 1 + + except TimeoutError: + pass + + except KeyboardInterrupt: + interrupted = True + + # Update spinner output after new report or timeout + spinner_i = (spinner_i + 1) % len(spinner_chars) + failed_str = f" ({failed} failed)" if failed else "" + out_text = f"Run group working... {spinner_chars[spinner_i]} {completed}/{total}{failed_str}" + print(f"\r{out_text}", end="", file=stderr) + stderr.flush() + last_write_len = len(out_text) + + if not self.wait_all and failed > 0: + break + + if interrupted: + break + + if interrupted: + self.executor.shutdown(wait=False, cancel_futures=True) + print("\nRun group interrupted.") + raise KeyboardInterrupt + + # Clear the spinner line before exiting + print(f"\r{' ' * last_write_len}\r", end="", file=stderr) + stderr.flush() + + if failed > 0: + if self.wait_all: + print(f"\n{failed} tasks failed.") + + for result in (r for r in self.futures if r.code): + result.report() - if bad: - print() - print("{} tasks failed.".format(len(bad))) sys.exit(1) - def run(self, command): - self.tasks.append(RunCommand(command, self.context)) + def _execute_command(self, command: list[str], cwd: str, env: dict): + process = subprocess.run( + command, + cwd=cwd, + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + encoding="utf-8", + check=False, + ) + return process.stdout, process.returncode + + def run(self, command: str): + command = self.context.expand(command) + cmd_env = self.context.environ.copy() + future = self.executor.submit( + self._execute_command, + shlex.split(command), + str(self.context.cwd), + cmd_env, + ) + self.futures.append(CommandResult(command, future))