diff --git a/.github/json/scripts/generate_checklist_json.py b/.github/json/scripts/generate_checklist_json.py index 5f11b618b..c521551aa 100644 --- a/.github/json/scripts/generate_checklist_json.py +++ b/.github/json/scripts/generate_checklist_json.py @@ -5,8 +5,14 @@ import os import re import sys +import time from collections import OrderedDict from pathlib import Path +from typing import Any +from urllib.error import HTTPError, URLError +from urllib.parse import quote +from urllib.request import Request, urlopen +from concurrent.futures import ThreadPoolExecutor, as_completed # Repo root (…/wstg) REPO_ROOT = Path(__file__).resolve().parents[3] @@ -17,6 +23,317 @@ "4-Web_Application_Security_Testing/" ) +OPENCRE_STANDARD = "OWASP Web Security Testing Guide (WSTG)" +OPENCRE_BASE_URL = "https://www.opencre.org/rest/v1/standard" +OPENCRE_LOOKUP_DESCRIPTION = ( + "OpenCRE is queried with `GET /rest/v1/standard/?section=` (plus `&page=` when the section spans multiple pages)." +) +CRE_IDS_CELL_MAX_LEN = 240 +DEFAULT_CONCURRENCY_LIMIT = 4 +RETRY_COUNT = 3 +REQUEST_TIMEOUT = 30 + + +def get_concurrency_limit() -> int: + default_limit = min(os.cpu_count() or 1, DEFAULT_CONCURRENCY_LIMIT) + raw_value = os.environ.get("OPENCRE_CONCURRENCY") + + if raw_value is None: + return default_limit + + try: + return max(1, int(raw_value)) + except ValueError: + return default_limit + + +CONCURRENCY_LIMIT = get_concurrency_limit() + + +def emit_markdown_report(text: str) -> None: + """Print markdown to stdout and append to GITHUB_STEP_SUMMARY when set.""" + print(text, flush=True) + summary_path = os.environ.get("GITHUB_STEP_SUMMARY") + if summary_path: + try: + with open(summary_path, "a", encoding="utf-8") as fh: + fh.write(text) + except OSError as exc: + print( + f"Warning: could not write GITHUB_STEP_SUMMARY: {exc}", + file=sys.stderr, + ) + + +class OpenCRELookupError(Exception): + """Raised when an OpenCRE request cannot be resolved.""" + + +def fetch_json_with_retry(url: str, retries: int = RETRY_COUNT) -> dict[str, Any]: + for attempt in range(retries): + try: + req = Request( + url, + headers={ + "Accept": "application/json", + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/137.0.0.0 Safari/537.36" + ), + }, + ) + + with urlopen(req, timeout=REQUEST_TIMEOUT) as response: + return json.loads(response.read().decode("utf-8")) + + except HTTPError as e: + if e.code == 404: + raise OpenCRELookupError(f"OpenCRE returned 404 for {url}") from e + + if attempt == retries - 1: + raise + + time.sleep(2**attempt) + + except URLError as e: + if attempt == retries - 1: + raise OpenCRELookupError(f"OpenCRE request failed for {url}: {e}") from e + + time.sleep(2**attempt) + + except Exception as e: + if attempt == retries - 1: + raise OpenCRELookupError(f"Unexpected error requesting {url}: {e}") from e + + time.sleep(2**attempt) + + raise OpenCRELookupError(f"Failed to fetch OpenCRE data after {retries} attempts: {url}") + + +def extract_cre_ids(data: dict[str, Any], section_id: str) -> list[str]: + standards = data.get("standards", []) + if not isinstance(standards, list): + return [] + + cre_ids: list[str] = [] + + for item in standards: + if not isinstance(item, dict): + continue + + if item.get("section") != section_id: + continue + + links = item.get("links", []) + if not isinstance(links, list): + continue + + for link in links: + if not isinstance(link, dict): + continue + + document = link.get("document", {}) + if not isinstance(document, dict): + continue + + if document.get("doctype") == "CRE": + cre_id = document.get("id") + if cre_id: + cre_ids.append(cre_id) + + return list(dict.fromkeys(cre_ids)) + + +def fetch_mapping(test_id: str) -> tuple[str, list[str]]: + base_url = ( + f"{OPENCRE_BASE_URL}/{quote(OPENCRE_STANDARD, safe='')}" + f"?section={quote(test_id, safe='')}" + ) + + first_page = fetch_json_with_retry(base_url) + all_cre_ids = extract_cre_ids(first_page, test_id) + + total_pages = first_page.get("total_pages", 1) + if not isinstance(total_pages, int) or total_pages < 1: + total_pages = 1 + + for page in range(2, total_pages + 1): + paged_url = f"{base_url}&page={page}" + page_data = fetch_json_with_retry(paged_url) + all_cre_ids.extend(extract_cre_ids(page_data, test_id)) + + return test_id, list(dict.fromkeys(all_cre_ids)) + + +def load_existing_cre_ids(path: Path) -> dict[str, list[str]]: + if not path.exists(): + return {} + + try: + data = json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError) as exc: + print(f"WARNING: Could not load existing checklist data from {path}: {exc}") + return {} + + categories = data.get("categories", {}) + if not isinstance(categories, dict): + return {} + + existing_cre_ids: dict[str, list[str]] = {} + + for category in categories.values(): + if not isinstance(category, dict): + continue + + tests = category.get("tests", []) + if not isinstance(tests, list): + continue + + for test in tests: + if not isinstance(test, dict): + continue + + test_id = test.get("id") + cre_ids = test.get("cre_ids") + if isinstance(test_id, str) and isinstance(cre_ids, list): + existing_cre_ids[test_id] = [ + cre_id for cre_id in cre_ids if isinstance(cre_id, str) + ] + + return existing_cre_ids + + +def _opencre_failure_is_404(message: str) -> bool: + return "404" in message + + +def _opencre_failure_response_code(message: str) -> str: + """Best-effort HTTP status from OpenCRE error text; ``—`` when not present.""" + m = re.search(r"returned (\d{3})\b", message) + if m: + return m.group(1) + if "404" in message: + return "404" + return "—" + + +def _sort_opencre_failures_guide_order( + rows: list[tuple[str, str]], guide_rank: dict[str, int] +) -> list[tuple[str, str]]: + """Order failures like the checklist (chapter order, then markdown file order).""" + sentinel = len(guide_rank) + 1 + return sorted( + rows, + key=lambda r: (guide_rank.get(r[0], sentinel), r[0]), + ) + + +def _emit_opencre_failure_report( + failures: list[tuple[str, str]], guide_order_ids: list[str] +) -> None: + if not failures: + return + guide_rank = {tid: i for i, tid in enumerate(guide_order_ids)} + lines: list[str] = [ + "## Checklist JSON: OpenCRE lookup failures\n\n", + f"{OPENCRE_LOOKUP_DESCRIPTION}\n\n", + f"**{len(failures)}** WSTG test ID(s) could not be fetched from OpenCRE; " + "existing `cre_ids` in `checklist.json` are kept when present.\n\n", + ] + not_found = _sort_opencre_failures_guide_order( + [(tid, msg) for tid, msg in failures if _opencre_failure_is_404(msg)], + guide_rank, + ) + other = _sort_opencre_failures_guide_order( + [(tid, msg) for tid, msg in failures if not _opencre_failure_is_404(msg)], + guide_rank, + ) + + def append_table(title: str, rows: list[tuple[str, str]]) -> None: + lines.append(f"### {title}\n\n") + if not rows: + lines.append("_None._\n\n") + return + lines.append("| WSTG ID | Response Code |\n") + lines.append("| --- | --- |\n") + for tid, msg in rows: + code = _opencre_failure_response_code(msg) + lines.append(f"| `{tid}` | {code} |\n") + lines.append("\n") + + append_table(f"HTTP 404 ({len(not_found)})", not_found) + append_table(f"Other errors ({len(other)})", other) + emit_markdown_report("".join(lines)) + + +def enrich_with_opencre(checklist: OrderedDict) -> OrderedDict: + all_tests = [] + existing_cre_ids = load_existing_cre_ids(OUTPUT_PATH) + + categories = checklist.get("categories", {}) + if isinstance(categories, dict): + for category in categories.values(): + if not isinstance(category, dict): + continue + + tests = category.get("tests", []) + if isinstance(tests, list): + all_tests.extend(tests) + + unique_ids = list( + dict.fromkeys( + test.get("id") + for test in all_tests + if isinstance(test, dict) and test.get("id") + ) + ) + + results: dict[str, list[str] | None] = {} + failures: list[tuple[str, str]] = [] + + with ThreadPoolExecutor(max_workers=CONCURRENCY_LIMIT) as executor: + futures = { + executor.submit(fetch_mapping, test_id): test_id + for test_id in unique_ids + } + + for future in as_completed(futures): + test_id = futures[future] + try: + returned_id, cre_ids = future.result() + results[returned_id] = cre_ids + except Exception as exc: + message = str(exc) + results[test_id] = None + failures.append((test_id, message)) + + _emit_opencre_failure_report(failures, unique_ids) + + for test in all_tests: + if not isinstance(test, dict): + continue + + test_id = test.get("id") + next_ids = results.get(test_id) + + if next_ids is None: + prior_ids = existing_cre_ids.get(test_id) + if prior_ids: + test["cre_ids"] = prior_ids + continue + + if not next_ids: + if "cre_ids" in test: + del test["cre_ids"] + continue + + if test.get("cre_ids") != next_ids: + test["cre_ids"] = next_ids + + return checklist + def category_label_from_dirname(dirname: str) -> str | None: """Human-readable category name from a chapter folder (e.g. ``01-Information_Gathering``).""" @@ -67,6 +384,15 @@ def title_from_h1_prefix(content: str) -> str: return first[2:] if len(first) >= 2 else first +def is_removed_placeholder_document(content: str) -> bool: + """True for stub markdown pages that only retain a removal notice.""" + meaningful_lines = [line.strip() for line in content.splitlines() if line.strip()] + if len(meaningful_lines) < 4: + return False + last_meaningful_line = meaningful_lines[-1].rstrip().rstrip(".") + return last_meaningful_line == "This content has been removed" + + def _nonblank_lines_in_objectives_section(content: str) -> list[str]: """Lines under ``## Test Objectives`` until the next ``## `` heading; skips blank lines.""" lines = content.splitlines() @@ -154,21 +480,22 @@ def _empty_objective_entries( def _write_empty_objectives_report(entries: list[tuple[str, str, str, str]]) -> None: - """ - In GitHub Actions, append to the job summary. Locally, print to stderr. - Never raises for missing env or IO errors beyond logging to stderr. - """ + """Build markdown for Test Objectives quality; emit to stdout and job summary.""" lines: list[str] = [] if not entries: - lines.append("## Checklist JSON: Test Objectives\n\n") + lines.append("## Checklist JSON: WSTG markdown — Test Objectives\n\n") lines.append( - "All generated entries have at least one non-blank objective.\n" + "All checklist rows include at least one non-blank objective parsed from " + "each chapter's `## Test Objectives` section.\n" ) else: - lines.append("## Checklist JSON: empty or blank Test Objectives\n\n") lines.append( - "These IDs have no non-blank objective strings; the Excel builder " - "will show **N/A** for objectives.\n\n" + "## Checklist JSON: WSTG markdown — empty or blank Test Objectives\n\n" + ) + lines.append( + "These rows have empty or whitespace-only objectives in JSON (from each " + "chapter's `## Test Objectives` section). The Excel builder shows **N/A** " + "for the objective column.\n\n" ) lines.append("| Category | ID | Name |\n") lines.append("| --- | --- | --- |\n") @@ -177,20 +504,65 @@ def _write_empty_objectives_report(entries: list[tuple[str, str, str, str]]) -> safe_name = name.replace("|", "\\|") lines.append(f"| {safe_cat} | `{tid}` | {safe_name} |\n") - text = "".join(lines) - summary_path = os.environ.get("GITHUB_STEP_SUMMARY") - if summary_path: - try: - with open(summary_path, "a", encoding="utf-8") as fh: - fh.write(text) - except OSError as exc: - print( - f"Warning: could not write GITHUB_STEP_SUMMARY: {exc}", - file=sys.stderr, - ) - print(text, file=sys.stderr) - else: - print(text, file=sys.stderr) + emit_markdown_report("".join(lines)) + + +def _cre_mapping_success_rows( + data: OrderedDict, +) -> list[tuple[str, str, str, str]]: + """(category_label, test_id, test_name, cre_joined) for tests with non-empty cre_ids.""" + rows: list[tuple[str, str, str, str]] = [] + categories = data.get("categories", {}) + if not isinstance(categories, dict): + return rows + for category_label, category in categories.items(): + if not isinstance(category, dict): + continue + tests = category.get("tests", []) + if not isinstance(tests, list): + continue + for test in tests: + if not isinstance(test, dict): + continue + cre_ids = test.get("cre_ids") + if not isinstance(cre_ids, list) or not cre_ids: + continue + parts = [str(x) for x in cre_ids if isinstance(x, str) and x] + if not parts: + continue + joined = ", ".join(parts) + if len(joined) > CRE_IDS_CELL_MAX_LEN: + joined = joined[: CRE_IDS_CELL_MAX_LEN - 1] + "…" + tid = test.get("id", "") + name = test.get("name", "") + if not isinstance(tid, str): + tid = str(tid) + if not isinstance(name, str): + name = str(name) + rows.append((str(category_label), tid, name, joined)) + rows.sort(key=lambda r: (r[0], r[1])) + return rows + + +def _write_cre_mapping_success_report(data: OrderedDict) -> None: + rows = _cre_mapping_success_rows(data) + lines: list[str] = ["## Checklist JSON: OpenCRE mappings (success)\n\n"] + if not rows: + lines.append("No tests have non-empty `cre_ids` after this run.\n") + emit_markdown_report("".join(lines)) + return + lines.append( + f"{OPENCRE_LOOKUP_DESCRIPTION}\n\n" + f"**{len(rows)}** checklist row(s) have at least one CRE id from OpenCRE.\n\n" + ) + lines.append("| Category | ID | Name | CRE IDs |\n") + lines.append("| --- | --- | --- | --- |\n") + for category, tid, name, cre_cell in rows: + safe_cat = category.replace("|", "\\|") + safe_name = name.replace("|", "\\|") + safe_cre = cre_cell.replace("|", "\\|") + lines.append(f"| {safe_cat} | `{tid}` | {safe_name} | {safe_cre} |\n") + emit_markdown_report("".join(lines)) def build_checklist() -> OrderedDict: @@ -212,6 +584,8 @@ def build_checklist() -> OrderedDict: if md_path.name == "README.md": continue text = md_path.read_text(encoding="utf-8") + if is_removed_placeholder_document(text): + continue tid = first_wstg_id_in_document(text) if not tid: continue @@ -236,6 +610,8 @@ def build_checklist() -> OrderedDict: def main() -> None: data = build_checklist() + data = enrich_with_opencre(data) + _write_cre_mapping_success_report(data) _write_empty_objectives_report(_empty_objective_entries(data)) OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) text = json.dumps(data, indent=2, ensure_ascii=False) + "\n"