-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Feature/opencre clean #1404
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/opencre clean #1404
Changes from 1 commit
9550a64
74e688f
b307959
d681705
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,8 +5,14 @@ | |
| import os | ||
| import re | ||
| import sys | ||
| import time | ||
| from collections import OrderedDict | ||
| from pathlib import Path | ||
| from typing import Any | ||
| from urllib.error import HTTPError, URLError | ||
| from urllib.parse import quote | ||
| from urllib.request import Request, urlopen | ||
| from concurrent.futures import ThreadPoolExecutor, as_completed | ||
|
|
||
| # Repo root (…/wstg) | ||
| REPO_ROOT = Path(__file__).resolve().parents[3] | ||
|
|
@@ -17,6 +23,231 @@ | |
| "4-Web_Application_Security_Testing/" | ||
| ) | ||
|
|
||
| OPENCRE_STANDARD = "OWASP Web Security Testing Guide (WSTG)" | ||
| OPENCRE_BASE_URL = "https://www.opencre.org/rest/v1/standard" | ||
| DEFAULT_CONCURRENCY_LIMIT = 4 | ||
| RETRY_COUNT = 3 | ||
| REQUEST_TIMEOUT = 30 | ||
|
|
||
| CONCURRENCY_LIMIT = int( | ||
| os.environ.get( | ||
| "OPENCRE_CONCURRENCY", | ||
| min(os.cpu_count() or 1, DEFAULT_CONCURRENCY_LIMIT), | ||
| ) | ||
| ) | ||
|
|
||
|
|
||
| class OpenCRELookupError(Exception): | ||
| """Raised when an OpenCRE request cannot be resolved.""" | ||
|
|
||
|
|
||
| def fetch_json_with_retry(url: str, retries: int = RETRY_COUNT) -> dict[str, Any]: | ||
| for attempt in range(retries): | ||
| try: | ||
| req = Request( | ||
| url, | ||
| headers={ | ||
| "Accept": "application/json", | ||
| "User-Agent": ( | ||
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " | ||
| "AppleWebKit/537.36 (KHTML, like Gecko) " | ||
| "Chrome/137.0.0.0 Safari/537.36" | ||
| ), | ||
|
kingthorin marked this conversation as resolved.
|
||
| }, | ||
|
kingthorin marked this conversation as resolved.
|
||
| ) | ||
|
|
||
| with urlopen(req, timeout=REQUEST_TIMEOUT) as response: | ||
| return json.loads(response.read().decode("utf-8")) | ||
|
|
||
| except HTTPError as e: | ||
| if e.code == 404: | ||
| raise OpenCRELookupError(f"OpenCRE returned 404 for {url}") from e | ||
|
|
||
|
Comment on lines
+91
to
+94
|
||
| if attempt == retries - 1: | ||
| raise | ||
|
|
||
| time.sleep(2**attempt) | ||
|
|
||
| except URLError as e: | ||
| if attempt == retries - 1: | ||
| raise OpenCRELookupError(f"OpenCRE request failed for {url}: {e}") from e | ||
|
|
||
| time.sleep(2**attempt) | ||
|
|
||
| except Exception as e: | ||
| if attempt == retries - 1: | ||
| raise OpenCRELookupError(f"Unexpected error requesting {url}: {e}") from e | ||
|
|
||
| time.sleep(2**attempt) | ||
|
|
||
| raise OpenCRELookupError(f"Failed to fetch OpenCRE data after {retries} attempts: {url}") | ||
|
|
||
|
|
||
| def extract_cre_ids(data: dict[str, Any], section_id: str) -> list[str]: | ||
| standards = data.get("standards", []) | ||
| if not isinstance(standards, list): | ||
| return [] | ||
|
|
||
| cre_ids: list[str] = [] | ||
|
|
||
| for item in standards: | ||
| if not isinstance(item, dict): | ||
| continue | ||
|
|
||
| if item.get("section") != section_id: | ||
| continue | ||
|
|
||
| links = item.get("links", []) | ||
| if not isinstance(links, list): | ||
| continue | ||
|
|
||
| for link in links: | ||
| if not isinstance(link, dict): | ||
| continue | ||
|
|
||
| document = link.get("document", {}) | ||
| if not isinstance(document, dict): | ||
| continue | ||
|
|
||
| if document.get("doctype") == "CRE": | ||
| cre_id = document.get("id") | ||
| if cre_id: | ||
| cre_ids.append(cre_id) | ||
|
|
||
| return list(dict.fromkeys(cre_ids)) | ||
|
|
||
|
|
||
| def fetch_mapping(test_id: str) -> tuple[str, list[str]]: | ||
| base_url = ( | ||
| f"{OPENCRE_BASE_URL}/{quote(OPENCRE_STANDARD, safe='')}" | ||
| f"?section={quote(test_id, safe='')}" | ||
| ) | ||
|
|
||
| first_page = fetch_json_with_retry(base_url) | ||
| all_cre_ids = extract_cre_ids(first_page, test_id) | ||
|
|
||
| total_pages = first_page.get("total_pages", 1) | ||
| if not isinstance(total_pages, int) or total_pages < 1: | ||
| total_pages = 1 | ||
|
|
||
| for page in range(2, total_pages + 1): | ||
| paged_url = f"{base_url}&page={page}" | ||
| page_data = fetch_json_with_retry(paged_url) | ||
| all_cre_ids.extend(extract_cre_ids(page_data, test_id)) | ||
|
|
||
| return test_id, list(dict.fromkeys(all_cre_ids)) | ||
|
|
||
|
|
||
| def load_existing_cre_ids(path: Path) -> dict[str, list[str]]: | ||
| if not path.exists(): | ||
| return {} | ||
|
|
||
| try: | ||
| data = json.loads(path.read_text(encoding="utf-8")) | ||
| except (OSError, json.JSONDecodeError) as exc: | ||
| print(f"WARNING: Could not load existing checklist data from {path}: {exc}") | ||
| return {} | ||
|
|
||
| categories = data.get("categories", {}) | ||
| if not isinstance(categories, dict): | ||
| return {} | ||
|
|
||
| existing_cre_ids: dict[str, list[str]] = {} | ||
|
|
||
| for category in categories.values(): | ||
| if not isinstance(category, dict): | ||
| continue | ||
|
|
||
| tests = category.get("tests", []) | ||
| if not isinstance(tests, list): | ||
| continue | ||
|
|
||
| for test in tests: | ||
| if not isinstance(test, dict): | ||
| continue | ||
|
|
||
| test_id = test.get("id") | ||
| cre_ids = test.get("cre_ids") | ||
| if isinstance(test_id, str) and isinstance(cre_ids, list): | ||
| existing_cre_ids[test_id] = [ | ||
| cre_id for cre_id in cre_ids if isinstance(cre_id, str) | ||
| ] | ||
|
|
||
| return existing_cre_ids | ||
|
|
||
|
|
||
| def enrich_with_opencre(checklist: OrderedDict) -> OrderedDict: | ||
| all_tests = [] | ||
| existing_cre_ids = load_existing_cre_ids(OUTPUT_PATH) | ||
|
|
||
| categories = checklist.get("categories", {}) | ||
| if isinstance(categories, dict): | ||
| for category in categories.values(): | ||
| if not isinstance(category, dict): | ||
| continue | ||
|
|
||
| tests = category.get("tests", []) | ||
| if isinstance(tests, list): | ||
| all_tests.extend(tests) | ||
|
|
||
| unique_ids = list( | ||
| dict.fromkeys( | ||
| test.get("id") | ||
| for test in all_tests | ||
| if isinstance(test, dict) and test.get("id") | ||
| ) | ||
| ) | ||
|
|
||
| results: dict[str, list[str] | None] = {} | ||
| failures: list[tuple[str, str]] = [] | ||
|
|
||
| with ThreadPoolExecutor(max_workers=CONCURRENCY_LIMIT) as executor: | ||
| futures = { | ||
| executor.submit(fetch_mapping, test_id): test_id | ||
| for test_id in unique_ids | ||
| } | ||
|
|
||
| for future in as_completed(futures): | ||
| test_id = futures[future] | ||
| try: | ||
| returned_id, cre_ids = future.result() | ||
| results[returned_id] = cre_ids | ||
| except Exception as exc: | ||
| message = str(exc) | ||
| print(f"WARNING: OpenCRE lookup failed for {test_id}: {message}") | ||
| results[test_id] = None | ||
| failures.append((test_id, message)) | ||
|
|
||
| if failures: | ||
| print( | ||
| "WARNING: OpenCRE enrichment failures for the following test IDs:" | ||
| ) | ||
| for failed_id, message in failures: | ||
| print(f" - {failed_id}: {message}") | ||
|
|
||
| for test in all_tests: | ||
| if not isinstance(test, dict): | ||
| continue | ||
|
|
||
| test_id = test.get("id") | ||
| next_ids = results.get(test_id) | ||
|
|
||
| if next_ids is None: | ||
| prior_ids = existing_cre_ids.get(test_id) | ||
| if prior_ids: | ||
| test["cre_ids"] = prior_ids | ||
| continue | ||
|
|
||
|
kingthorin marked this conversation as resolved.
|
||
| if not next_ids: | ||
| if "cre_ids" in test: | ||
| del test["cre_ids"] | ||
| continue | ||
|
|
||
| if test.get("cre_ids") != next_ids: | ||
| test["cre_ids"] = next_ids | ||
|
|
||
| return checklist | ||
|
|
||
|
|
||
| def category_label_from_dirname(dirname: str) -> str | None: | ||
| """Human-readable category name from a chapter folder (e.g. ``01-Information_Gathering``).""" | ||
|
|
@@ -236,6 +467,7 @@ def build_checklist() -> OrderedDict: | |
|
|
||
| def main() -> None: | ||
| data = build_checklist() | ||
| data = enrich_with_opencre(data) | ||
| _write_empty_objectives_report(_empty_objective_entries(data)) | ||
| OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) | ||
| text = json.dumps(data, indent=2, ensure_ascii=False) + "\n" | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.