diff --git a/gittensor/constants.py b/gittensor/constants.py index 0297ccd2..11b84505 100644 --- a/gittensor/constants.py +++ b/gittensor/constants.py @@ -105,6 +105,9 @@ # while holding the GIL. 2s is well above the millisecond cost of real files. TREE_SITTER_PARSE_TIMEOUT_MICROS = 2_000_000 +# Wall-clock budget (s) for the isolated-subprocess scoring path. +SCORING_SUBPROCESS_BUDGET_S = 5.0 + # comment nodes for token scoring COMMENT_NODE_TYPES = frozenset( { diff --git a/gittensor/validator/oss_contributions/mirror/scoring.py b/gittensor/validator/oss_contributions/mirror/scoring.py index 97c7910f..19a736b6 100644 --- a/gittensor/validator/oss_contributions/mirror/scoring.py +++ b/gittensor/validator/oss_contributions/mirror/scoring.py @@ -50,12 +50,12 @@ calculate_review_quality_multiplier, ) from gittensor.validator.utils.datetime_utils import calculate_time_decay +from gittensor.validator.utils.isolated_scoring import isolated_calculate_token_score from gittensor.validator.utils.load_weights import ( LanguageConfig, RepositoryConfig, TokenConfig, ) -from gittensor.validator.utils.tree_sitter_scoring import calculate_token_score_from_file_changes # ============================================================================ # Entry point @@ -277,7 +277,7 @@ def calculate_base_score_for_pr_files( they're populating (e.g. ``ScoredPR`` for OSS, ``Issue`` discovery fields for issue discovery). """ - scoring_result: PrScoringResult = calculate_token_score_from_file_changes( + scoring_result: PrScoringResult = isolated_calculate_token_score( file_changes, file_contents, token_config, diff --git a/gittensor/validator/utils/isolated_scoring.py b/gittensor/validator/utils/isolated_scoring.py new file mode 100644 index 00000000..a48da99f --- /dev/null +++ b/gittensor/validator/utils/isolated_scoring.py @@ -0,0 +1,134 @@ +# The MIT License (MIT) +# Copyright © 2025 Entrius +"""Subprocess-isolated wrapper around tree-sitter scoring. + +Uses a spawn-context pool (not fork - avoids inheriting bittensor +threads/sockets) with an external wall-clock budget; on +timeout/crash the child is killed and the PR zero-scored. +""" + +from __future__ import annotations + +import atexit +import multiprocessing +import multiprocessing.pool +import threading +from multiprocessing import get_context +from typing import TYPE_CHECKING, Dict, List, Optional + +import bittensor as bt + +from gittensor.classes import FileScoreResult, PrScoringResult +from gittensor.constants import SCORING_SUBPROCESS_BUDGET_S +from gittensor.utils.github_api_tools import FileContentPair +from gittensor.validator.utils.load_weights import LanguageConfig, TokenConfig + +if TYPE_CHECKING: + from gittensor.classes import FileChange + + +_pool: Optional[multiprocessing.pool.Pool] = None +_pool_lock = threading.Lock() + + +def _worker( + file_changes: List['FileChange'], + file_contents: Dict[str, FileContentPair], + weights: TokenConfig, + programming_languages: Dict[str, LanguageConfig], +) -> PrScoringResult: + # Lazy import: avoids pulling tree_sitter into importers of this module. + from gittensor.validator.utils.tree_sitter_scoring import ( + calculate_token_score_from_file_changes, + ) + + return calculate_token_score_from_file_changes( + file_changes, + file_contents, + weights, + programming_languages, + ) + + +def _ensure_pool() -> multiprocessing.pool.Pool: + global _pool + if _pool is None: + ctx = get_context('spawn') + _pool = ctx.Pool(processes=1) + return _pool + + +def _reset_pool() -> None: + global _pool + if _pool is None: + return + try: + _pool.terminate() + _pool.join() + except Exception: + pass + _pool = None + + +def _empty_pr_result(file_changes: List['FileChange']) -> PrScoringResult: + """Zero-scored result with one ``skipped-isolation-timeout`` entry per file""" + file_results = [ + FileScoreResult( + filename=f.short_name, + score=0.0, + nodes_scored=0, + total_lines=f.changes, + is_test_file=False, + scoring_method='skipped-isolation-timeout', + ) + for f in file_changes + ] + return PrScoringResult( + total_score=0.0, + total_nodes_scored=0, + total_lines=sum(f.changes for f in file_changes), + file_results=file_results, + ) + + +def shutdown() -> None: + """Tear down the worker pool; idempotent, registered with ``atexit``""" + with _pool_lock: + _reset_pool() + + +atexit.register(shutdown) + + +def isolated_calculate_token_score( + file_changes: List['FileChange'], + file_contents: Dict[str, FileContentPair], + weights: TokenConfig, + programming_languages: Dict[str, LanguageConfig], + timeout_s: float = SCORING_SUBPROCESS_BUDGET_S, +) -> PrScoringResult: + """Score a PR's files in an isolated subprocess with a hard wall-clock. + + On timeout or worker error the pool is reset and a zero-scored result is + returned. + """ + with _pool_lock: + pool = _ensure_pool() + async_res = pool.apply_async( + _worker, + (file_changes, file_contents, weights, programming_languages), + ) + + # Lock not held during get() - a 5s wait would needlessly serialize callers. + try: + return async_res.get(timeout=timeout_s) + except multiprocessing.TimeoutError: + bt.logging.warning( + f'Isolated scoring exceeded {timeout_s}s wall budget for {len(file_changes)} files - killing worker' + ) + except Exception as e: + bt.logging.warning(f'Isolated scoring worker raised {type(e).__name__}: {e!s} - resetting pool') + + with _pool_lock: + _reset_pool() + return _empty_pr_result(file_changes) diff --git a/tests/validator/utils/test_isolated_scoring.py b/tests/validator/utils/test_isolated_scoring.py new file mode 100644 index 00000000..6a8bca0b --- /dev/null +++ b/tests/validator/utils/test_isolated_scoring.py @@ -0,0 +1,100 @@ +# The MIT License (MIT) +# Copyright © 2025 Entrius +"""Tests for ``isolated_calculate_token_score`` timeout and exception paths""" + +from __future__ import annotations + +import multiprocessing + +import pytest + +from gittensor.classes import FileChange, PrScoringResult +from gittensor.validator.utils import isolated_scoring +from gittensor.validator.utils.load_weights import TokenConfig + + +def _change(name: str = 'a.py', changes: int = 10) -> FileChange: + return FileChange( + pr_number=1, + repository_full_name='x/y', + filename=name, + changes=changes, + additions=changes, + deletions=0, + status='added', + file_extension='py', + ) + + +class _FakeAsyncResult: + def __init__(self, exc: BaseException) -> None: + self._exc = exc + + def get(self, timeout: float | None = None): + raise self._exc + + +class _FakePool: + def __init__(self, exc: BaseException) -> None: + self._exc = exc + self.terminated = False + self.joined = False + + def apply_async(self, fn, args): + return _FakeAsyncResult(self._exc) + + def terminate(self) -> None: + self.terminated = True + + def join(self) -> None: + self.joined = True + + +@pytest.fixture(autouse=True) +def _reset_module_pool(): + isolated_scoring._pool = None + yield + isolated_scoring._pool = None + + +def _call_with_fake(fake: _FakePool, changes: list[FileChange], timeout_s: float = 0.01) -> PrScoringResult: + isolated_scoring._pool = fake + return isolated_scoring.isolated_calculate_token_score( + changes, + {}, + weights=TokenConfig(), + programming_languages={}, + timeout_s=timeout_s, + ) + + +def test_worker_timeout_yields_empty_result_and_resets_pool(): + fake = _FakePool(multiprocessing.TimeoutError()) + changes = [_change('a.py', 10), _change('b.py', 5)] + + result = _call_with_fake(fake, changes) + + assert isinstance(result, PrScoringResult) + assert result.total_score == 0.0 + assert result.total_nodes_scored == 0 + assert result.total_lines == 15 + assert [f.filename for f in result.file_results] == ['a.py', 'b.py'] + assert all(f.scoring_method == 'skipped-isolation-timeout' for f in result.file_results) + assert all(f.score == 0.0 and f.nodes_scored == 0 for f in result.file_results) + assert fake.terminated and fake.joined + assert isolated_scoring._pool is None + + +def test_worker_exception_yields_empty_result_and_resets_pool(): + fake = _FakePool(RuntimeError('boom')) + changes = [_change('a.py', 7)] + + result = _call_with_fake(fake, changes, timeout_s=1.0) + + assert isinstance(result, PrScoringResult) + assert result.total_score == 0.0 + assert len(result.file_results) == 1 + assert result.file_results[0].scoring_method == 'skipped-isolation-timeout' + assert result.file_results[0].total_lines == 7 + assert fake.terminated and fake.joined + assert isolated_scoring._pool is None