Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benchmarks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Reproducible benchmark runner plumbing for python-docs-mcp-server."""
63 changes: 63 additions & 0 deletions benchmarks/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Command line entry point for the benchmark runner."""

from __future__ import annotations

import argparse
import json
import sys
from pathlib import Path

from benchmarks.runner import BenchmarkConfig, BenchmarkValidationError, run_benchmark


def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="python -m benchmarks",
description="Run the reproducible Python docs benchmark harness.",
)
subparsers = parser.add_subparsers(dest="command", required=True)

run = subparsers.add_parser("run", help="Run benchmark cells and write raw artifacts.")
run.add_argument("--corpus", required=True, type=Path, help="Path to the corpus YAML file.")
run.add_argument(
"--manifest",
required=True,
type=Path,
help="Path to the competitor manifest YAML file.",
)
run.add_argument("--out", required=True, type=Path, help="Output directory for run artifacts.")
run.add_argument("--run-id", help="Stable run identifier. Defaults to a UTC timestamp.")
run.add_argument(
"--dry-run",
action="store_true",
help="Validate inputs and emit planned benchmark cells without execution.",
)
return parser


def main(argv: list[str] | None = None) -> int:
parser = _build_parser()
args = parser.parse_args(argv)

if args.command == "run":
config = BenchmarkConfig(
corpus_path=args.corpus,
manifest_path=args.manifest,
out_dir=args.out,
run_id=args.run_id,
dry_run=args.dry_run,
)
try:
summary = run_benchmark(config)
except BenchmarkValidationError as exc:
parser.exit(2, f"benchmark validation failed: {exc}\n")

print(json.dumps(summary, indent=2, sort_keys=True))
return 0

parser.print_help()
return 2


if __name__ == "__main__":
sys.exit(main())
333 changes: 333 additions & 0 deletions benchmarks/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
"""Benchmark runner artifact plumbing.

This module intentionally contains only local fake/baseline execution. Real
provider adapters and report generation are follow-up work packages.
"""

from __future__ import annotations

import json
import platform
import re
import subprocess
import sys
import time
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import Any

import yaml

_SAFE_ID = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_.-]*$")
_EXECUTABLE_ADAPTERS = {"fake", "no-mcp-baseline", "no_mcp_baseline"}


class BenchmarkValidationError(ValueError):
"""Raised when corpus or manifest input is not runnable."""


@dataclass(frozen=True)
class BenchmarkConfig:
"""Benchmark runner configuration."""

corpus_path: Path
manifest_path: Path
out_dir: Path
run_id: str | None = None
dry_run: bool = False


@dataclass(frozen=True)
class Question:
"""Validated corpus question."""

id: str
prompt: str
raw: dict[str, Any]


@dataclass(frozen=True)
class Competitor:
"""Validated competitor manifest entry."""

id: str
adapter: str
raw: dict[str, Any]


@dataclass(frozen=True)
class BenchmarkCell:
"""One competitor/question execution cell."""

competitor: Competitor
question: Question

@property
def cell_id(self) -> str:
return f"{self.competitor.id}/{self.question.id}"


def run_benchmark(config: BenchmarkConfig) -> dict[str, Any]:
"""Run or dry-run a benchmark and write the stable artifact layout."""
corpus_data = _load_yaml_mapping(config.corpus_path, "corpus")
manifest_data = _load_yaml_mapping(config.manifest_path, "manifest")
questions = _load_questions(corpus_data)
competitors = _load_competitors(manifest_data)
cells = [
BenchmarkCell(competitor=competitor, question=question)
for competitor in competitors
for question in questions
]

run_id = config.run_id or _default_run_id()
run_dir = config.out_dir
run_dir.mkdir(parents=True, exist_ok=True)
_write_artifact(run_dir / "snapshots" / "competitor-manifest.yml", manifest_data)
_write_artifact(run_dir / "snapshots" / "corpus.yml", corpus_data)

started_at = _utc_now()
environment = _environment_metadata(run_id=run_id, dry_run=config.dry_run)
_write_json(run_dir / "environment.json", environment)

planned_cells = [
{"competitor_id": cell.competitor.id, "corpus_id": cell.question.id}
for cell in cells
]
_write_json(run_dir / "planned-cells.json", {"cells": planned_cells})

succeeded = 0
failed = 0
if not config.dry_run:
for cell in cells:
result = _execute_cell(cell)
_write_cell_artifacts(run_dir, cell, result)
if result["status"] == "succeeded":
succeeded += 1
else:
failed += 1

summary = {
"run_id": run_id,
"dry_run": config.dry_run,
"started_at": started_at,
"completed_at": _utc_now(),
"corpus_path": str(config.corpus_path),
"manifest_path": str(config.manifest_path),
"artifact_root": str(run_dir),
"repo_commit": environment["repo_commit"],
"external_provider_calls": False,
"planned_cells": len(cells),
"succeeded_cells": succeeded,
"failed_cells": failed,
"competitors": [competitor.id for competitor in competitors],
"corpus_ids": [question.id for question in questions],
}
_write_json(run_dir / "run-summary.json", summary)
return summary


def _load_yaml_mapping(path: Path, label: str) -> dict[str, Any]:
if not path.exists():
raise BenchmarkValidationError(f"{label} file does not exist: {path}")
with path.open("r", encoding="utf-8") as file:
data = yaml.safe_load(file)
if not isinstance(data, dict):
raise BenchmarkValidationError(f"{label} file must contain a YAML mapping")
return data


def _load_questions(data: dict[str, Any]) -> list[Question]:
items = data.get("questions")
if not isinstance(items, list) or not items:
raise BenchmarkValidationError("corpus must contain a non-empty 'questions' list")

questions: list[Question] = []
seen: set[str] = set()
for index, item in enumerate(items):
if not isinstance(item, dict):
raise BenchmarkValidationError(f"corpus question at index {index} must be a mapping")
question_id = _required_safe_id(item, "id", f"corpus question at index {index}")
if question_id in seen:
raise BenchmarkValidationError(f"duplicate corpus id: {question_id}")
seen.add(question_id)
prompt = item.get("prompt")
if not isinstance(prompt, str) or not prompt.strip():
raise BenchmarkValidationError(f"corpus question {question_id} must include prompt")
questions.append(Question(id=question_id, prompt=prompt, raw=item))
return questions


def _load_competitors(data: dict[str, Any]) -> list[Competitor]:
items = data.get("competitors")
if not isinstance(items, list) or not items:
raise BenchmarkValidationError("manifest must contain a non-empty 'competitors' list")

competitors: list[Competitor] = []
seen: set[str] = set()
for index, item in enumerate(items):
if not isinstance(item, dict):
raise BenchmarkValidationError(f"competitor at index {index} must be a mapping")
competitor_id = _required_safe_id(item, "id", f"competitor at index {index}")
if competitor_id in seen:
raise BenchmarkValidationError(f"duplicate competitor id: {competitor_id}")
seen.add(competitor_id)
adapter = item.get("adapter")
if not isinstance(adapter, str) or not adapter.strip():
raise BenchmarkValidationError(f"competitor {competitor_id} must include adapter")
competitors.append(Competitor(id=competitor_id, adapter=adapter, raw=item))
return competitors


def _required_safe_id(item: dict[str, Any], key: str, label: str) -> str:
value = item.get(key)
if not isinstance(value, str) or not value.strip():
raise BenchmarkValidationError(f"{label} is missing required {key!r}")
if not _SAFE_ID.fullmatch(value):
raise BenchmarkValidationError(
f"{label} has unsafe {key!r}: {value!r}; use letters, numbers, dots, dashes, "
"or underscores"
)
return value


def _execute_cell(cell: BenchmarkCell) -> dict[str, Any]:
started = time.perf_counter()
started_at = _utc_now()
status = "succeeded"
error: dict[str, str] | None = None
answer = ""

try:
answer = _fake_provider_answer(cell)
except Exception as exc: # noqa: BLE001 - failures are benchmark artifacts
status = "failed"
error = {"type": type(exc).__name__, "message": str(exc)}

latency_ms = round((time.perf_counter() - started) * 1000, 3)
completed_at = _utc_now()
transcript = {
"competitor_id": cell.competitor.id,
"corpus_id": cell.question.id,
"adapter": cell.competitor.adapter,
"status": status,
"started_at": started_at,
"completed_at": completed_at,
"messages": [{"role": "user", "content": cell.question.prompt}],
"answer": answer,
"error": error,
"external_provider_calls": False,
}
token_record = {
"competitor_id": cell.competitor.id,
"corpus_id": cell.question.id,
"status": "placeholder",
"input_characters": len(cell.question.prompt),
"output_characters": len(answer),
"client_wrapped_tokens": None,
"raw_payload_tokens": None,
"notes": "Token counting integration is intentionally out of scope for issue #72.",
}
latency_record = {
"competitor_id": cell.competitor.id,
"corpus_id": cell.question.id,
"status": status,
"latency_ms": latency_ms,
"started_at": started_at,
"completed_at": completed_at,
}
scoring_record = {
"competitor_id": cell.competitor.id,
"corpus_id": cell.question.id,
"status": "placeholder",
"score": None,
"requires_manual_scoring": True,
"notes": "Correctness scoring automation is intentionally out of scope for issue #72.",
}
return {
"status": status,
"transcript": transcript,
"tokens": token_record,
"latency": latency_record,
"scoring": scoring_record,
"failure": error,
}


def _fake_provider_answer(cell: BenchmarkCell) -> str:
adapter = cell.competitor.adapter
if adapter not in _EXECUTABLE_ADAPTERS:
raise RuntimeError(f"adapter {adapter!r} is not implemented in issue #72 runner")
if cell.competitor.raw.get("force_failure") is True:
raise RuntimeError("forced fake provider failure")
answer = cell.competitor.raw.get("fake_answer")
if isinstance(answer, str):
return answer
return f"[fake:{cell.competitor.id}] {cell.question.prompt}"


def _write_cell_artifacts(run_dir: Path, cell: BenchmarkCell, result: dict[str, Any]) -> None:
competitor_id = cell.competitor.id
corpus_id = cell.question.id
_write_json(run_dir / "transcripts" / competitor_id / f"{corpus_id}.json", result["transcript"])
_write_json(run_dir / "tokens" / competitor_id / f"{corpus_id}.json", result["tokens"])
_write_json(run_dir / "latency" / competitor_id / f"{corpus_id}.json", result["latency"])
_write_json(run_dir / "scoring" / competitor_id / f"{corpus_id}.json", result["scoring"])
if result["failure"] is not None:
_write_json(
run_dir / "failures" / competitor_id / f"{corpus_id}.json",
{
"competitor_id": competitor_id,
"corpus_id": corpus_id,
"status": "failed",
"error": result["failure"],
},
)


def _environment_metadata(*, run_id: str, dry_run: bool) -> dict[str, Any]:
return {
"run_id": run_id,
"created_at": _utc_now(),
"dry_run": dry_run,
"repo_commit": _repo_commit_sha(),
"python_version": sys.version,
"python_executable": sys.executable,
"platform": platform.platform(),
"system": platform.system(),
"machine": platform.machine(),
"benchmark_runner": "benchmarks",
"external_provider_calls": False,
}


def _repo_commit_sha() -> str:
try:
result = subprocess.run(
["git", "rev-parse", "HEAD"],
check=True,
capture_output=True,
text=True,
timeout=5,
)
except (OSError, subprocess.CalledProcessError, subprocess.TimeoutExpired):
return "unknown"
return result.stdout.strip() or "unknown"


def _write_json(path: Path, data: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2, sort_keys=True) + "\n", encoding="utf-8")


def _write_artifact(path: Path, data: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(yaml.safe_dump(data, sort_keys=True), encoding="utf-8")


def _default_run_id() -> str:
return datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ")


def _utc_now() -> str:
return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
Loading